Merge branch 'master' into wip-2006-binary-compat-√

This commit is contained in:
Viktor Klang 2012-05-21 14:37:42 +02:00
commit 3ba5db053a
30 changed files with 4059 additions and 515 deletions

View file

@ -300,8 +300,9 @@ abstract class ActorSystem extends ActorRefFactory {
* Default dispatcher as configured. This dispatcher is used for all actors
* in the actor system which do not have a different dispatcher configured
* explicitly.
* Importing this member will place the default MessageDispatcher in scope.
*/
def dispatcher: MessageDispatcher
implicit def dispatcher: MessageDispatcher
/**
* Register a block of code (callback) to run after all actors in this actor system have

View file

@ -138,7 +138,7 @@ case class Props(
* Java API.
*/
def this(actorClass: Class[_ <: Actor]) = this(
creator = () actorClass.newInstance,
creator = FromClassCreator(actorClass),
dispatcher = Dispatchers.DefaultDispatcherId,
routerConfig = Props.defaultRoutedProps)
@ -161,7 +161,7 @@ case class Props(
*
* Java API.
*/
def withCreator(c: Class[_ <: Actor]): Props = copy(creator = () c.newInstance)
def withCreator(c: Class[_ <: Actor]): Props = copy(creator = FromClassCreator(c))
/**
* Returns a new Props with the specified dispatcher set.
@ -177,4 +177,13 @@ case class Props(
* Returns a new Props with the specified deployment configuration.
*/
def withDeploy(d: Deploy): Props = copy(deploy = d)
}
/**
* Used when creating an Actor from a class. Special Function0 to be
* able to optimize serialization.
*/
private[akka] case class FromClassCreator(clazz: Class[_ <: Actor]) extends Function0[Actor] {
def apply(): Actor = clazz.newInstance
}

View file

@ -34,6 +34,7 @@ endif
help:
@echo "Please use \`make <target>' where <target> is one of"
@echo " pygments to locally install the custom pygments styles"
@echo " epub to make an epub"
@echo " html to make standalone HTML files"
@echo " singlehtml to make a single large HTML file"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@ -53,6 +54,11 @@ pygments:
$(LOCALPACKAGES):
$(MAKE) pygments
epub: $(LOCALPACKAGES)
$(SPHINXBUILD) $(SPHINXFLAGS) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
@echo
@echo "Build finished. The epub file is in $(BUILDDIR)/epub."
html: $(LOCALPACKAGES)
$(SPHINXBUILD) $(SPHINXFLAGS) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo

View file

@ -52,6 +52,14 @@ html_context = {
'include_analytics': 'online' in tags
}
# -- Options for EPUB output ---------------------------------------------------
epub_author = "Typesafe Inc"
epub_language = "en"
epub_publisher = epub_author
epub_identifier = "http://doc.akka.io/docs/akka/snapshot/"
epub_scheme = "URL"
epub_cover = ("_sphinx/static/akka.png", "")
# -- Options for LaTeX output --------------------------------------------------
def setup(app):

View file

@ -13,7 +13,7 @@ Akka can be used in different ways:
be put into ``WEB-INF/lib``
- As a stand alone application by instantiating ActorSystem in a main class or
using the :ref:`microkernel`
using the :ref:`microkernel-scala` / :ref:`microkernel-java`
Using Akka as library
@ -27,5 +27,6 @@ modules to the stack.
Using Akka as a stand alone microkernel
----------------------------------------
Akka can also be run as a stand-alone microkernel. See :ref:`microkernel` for
Akka can also be run as a stand-alone microkernel. See
:ref:`microkernel-scala` / :ref:`microkernel-java` for
more information.

View file

@ -67,7 +67,8 @@ The Akka distribution includes the microkernel. To run the microkernel put your
application jar in the ``deploy`` directory and use the scripts in the ``bin``
directory.
More information is available in the documentation of the :ref:`microkernel`.
More information is available in the documentation of the
:ref:`microkernel-scala` / :ref:`microkernel-java`.
Using a build tool
------------------
@ -136,12 +137,17 @@ SBT installation instructions on `https://github.com/harrah/xsbt/wiki/Setup <htt
Using Akka with Eclipse
-----------------------
Setup SBT project and then use `sbteclipse <https://github.com/typesafehub/sbteclipse>`_ to generate Eclipse project.
Setup SBT project and then use `sbteclipse <https://github.com/typesafehub/sbteclipse>`_ to generate a Eclipse project.
Using Akka with IntelliJ IDEA
-----------------------------
Setup SBT project and then use `sbt-idea <https://github.com/mpeltonen/sbt-idea>`_ to generate IntelliJ IDEA project.
Setup SBT project and then use `sbt-idea <https://github.com/mpeltonen/sbt-idea>`_ to generate a IntelliJ IDEA project.
Using Akka with NetBeans
------------------------
Setup SBT project and then use `sbt-netbeans-plugin <https://github.com/remeniuk/sbt-netbeans-plugin>`_ to generate a NetBeans project.
Build from sources
------------------

View file

@ -1,5 +1,5 @@
.. _microkernel:
.. _microkernel-java:
Microkernel (Java)
==================

View file

@ -33,8 +33,100 @@ class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) {
"configuration of dispatcher with durable mailbox" in {
//#dispatcher-config-use
val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor")
val myActor = system.actorOf(Props[MyActor].
withDispatcher("my-dispatcher"), name = "myactor")
//#dispatcher-config-use
}
}
//#custom-mailbox
import com.typesafe.config.Config
import akka.actor.ActorContext
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.dispatch.Envelope
import akka.dispatch.MailboxType
import akka.dispatch.MessageQueue
import akka.actor.mailbox.DurableMessageQueue
import akka.actor.mailbox.DurableMessageSerialization
class MyMailboxType(systemSettings: ActorSystem.Settings, config: Config)
extends MailboxType {
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new MyMessageQueue(o)
case None throw new IllegalArgumentException(
"requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
class MyMessageQueue(_owner: ActorContext)
extends DurableMessageQueue(_owner) with DurableMessageSerialization {
val storage = new QueueStorage
def enqueue(receiver: ActorRef, envelope: Envelope) {
val data: Array[Byte] = serialize(envelope)
storage.push(data)
}
def dequeue(): Envelope = {
val data: Option[Array[Byte]] = storage.pull()
data.map(deserialize).orNull
}
def hasMessages: Boolean = !storage.isEmpty
def numberOfMessages: Int = storage.size
/**
* Called when the mailbox is disposed.
* An ordinary mailbox would send remaining messages to deadLetters,
* but the purpose of a durable mailbox is to continue
* with the same message queue when the actor is started again.
*/
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
}
//#custom-mailbox
// dummy
class QueueStorage {
import java.util.concurrent.ConcurrentLinkedQueue
val queue = new ConcurrentLinkedQueue[Array[Byte]]
def push(data: Array[Byte]): Unit = queue.offer(data)
def pull(): Option[Array[Byte]] = Option(queue.poll())
def isEmpty: Boolean = queue.isEmpty
def size: Int = queue.size
}
//#custom-mailbox-test
import akka.actor.mailbox.DurableMailboxSpec
object MyMailboxSpec {
val config = """
MyStorage-dispatcher {
mailbox-type = akka.docs.actor.mailbox.MyMailboxType
}
"""
}
class MyMailboxSpec extends DurableMailboxSpec("MyStorage", MyMailboxSpec.config) {
override def atStartup() {
}
override def atTermination() {
}
"MyMailbox" must {
"deliver a message" in {
val actor = createMailboxTestActor()
implicit val sender = testActor
actor ! "hello"
expectMsg("hello")
}
// add more tests
}
}

View file

@ -4,9 +4,8 @@
package akka.docs.actor.mailbox;
//#imports
import akka.actor.UntypedActorFactory;
import akka.actor.UntypedActor;
import akka.actor.Props;
import akka.actor.ActorRef;
//#imports
@ -16,8 +15,8 @@ import org.junit.Test;
import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import static org.junit.Assert.*;
@ -39,12 +38,8 @@ public class DurableMailboxDocTestBase {
@Test
public void configDefinedDispatcher() {
//#dispatcher-config-use
ActorRef myActor = system.actorOf(
new Props().withDispatcher("my-dispatcher").withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new MyUntypedActor();
}
}), "myactor");
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).
withDispatcher("my-dispatcher"), "myactor");
//#dispatcher-config-use
myActor.tell("test");
}

View file

@ -9,40 +9,45 @@
Overview
========
Akka supports a set of durable mailboxes. A durable mailbox is a replacement for
the standard actor mailbox that is durable. What this means in practice is that
if there are pending messages in the actor's mailbox when the node of the actor
resides on crashes, then when you restart the node, the actor will be able to
continue processing as if nothing had happened; with all pending messages still
in its mailbox.
A durable mailbox is a mailbox which stores the messages on durable storage.
What this means in practice is that if there are pending messages in the actor's
mailbox when the node of the actor resides on crashes, then when you restart the
node, the actor will be able to continue processing as if nothing had happened;
with all pending messages still in its mailbox.
None of these mailboxes implements transactions for current message. It's possible
You configure durable mailboxes through the dispatcher. The actor is oblivious
to which type of mailbox it is using.
This gives you an excellent way of creating bulkheads in your application, where
groups of actors sharing the same dispatcher also share the same backing
storage. Read more about that in the :ref:`dispatchers-scala` documentation.
One basic file based durable mailbox is provided by Akka out-of-the-box.
Other implementations can easily be added. Some are available as separate community
Open Source projects, such as:
* `AMQP Durable Mailbox <https://github.com/drexin/akka-amqp-mailbox>`_
A durable mailbox is like any other mailbox not likely to be transactional. It's possible
if the actor crashes after receiving a message, but before completing processing of
it, that the message could be lost.
.. warning:: **IMPORTANT**
.. warning::
None of these mailboxes work with blocking message send, i.e. the message
A durable mailbox typically doesn't work with blocking message send, i.e. the message
send operations that are relying on futures; ``?`` or ``ask``. If the node
has crashed and then restarted, the thread that was blocked waiting for the
reply is gone and there is no way we can deliver the message.
The durable mailboxes supported out-of-the-box are:
- ``FileBasedMailbox`` -- backed by a journaling transaction log on the local file system
File-based durable mailbox
==========================
You can easily implement your own mailbox. Look at the existing implementation for inspiration.
.. _DurableMailbox.General:
General Usage
-------------
The durable mailboxes and their configuration options reside in the
``akka.actor.mailbox`` package.
You configure durable mailboxes through the dispatcher. The
actor is oblivious to which type of mailbox it is using.
This mailbox is backed by a journaling transaction log on the local file
system. It is the simplest to use since it does not require an extra
infrastructure piece to administer, but it is usually sufficient and just what
you need.
In the configuration of the dispatcher you specify the fully qualified class name
of the mailbox:
@ -60,32 +65,38 @@ Corresponding example in Java:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java
:include: imports,dispatcher-config-use
The actor is oblivious to which type of mailbox it is using.
This gives you an excellent way of creating bulkheads in your application, where
groups of actors sharing the same dispatcher also share the same backing
storage. Read more about that in the :ref:`dispatchers-scala` documentation.
File-based durable mailbox
==========================
This mailbox is backed by a journaling transaction log on the local file
system. It is the simplest to use since it does not require an extra
infrastructure piece to administer, but it is usually sufficient and just what
you need.
You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type.
Config::
my-dispatcher {
mailbox-type = akka.actor.mailbox.FileBasedMailboxType
}
You can also configure and tune the file-based durable mailbox. This is done in
the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`.
.. literalinclude:: ../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf
:language: none
How to implement a durable mailbox
==================================
Here is an example of how to implement a custom durable mailbox. Essentially it consists of
a configurator (MailboxType) and a queue implementation (DurableMessageQueue).
The envelope contains the message sent to the actor, and information about sender. It is the
envelope that needs to be stored. As a help utility you can mixin DurableMessageSerialization
to serialize and deserialize the envelope using the ordinary :ref:`serialization-scala`
mechanism. This optional and you may store the envelope data in any way you like.
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala
:include: custom-mailbox
To facilitate testing of a durable mailbox you may use ``DurableMailboxSpec`` as base class.
It implements a few basic tests and helps you setup the a fixture. More tests can be
added in concrete subclass like this:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala
:include: custom-mailbox-test
You find DurableMailboxDocSpec in ``akka-mailboxes-common-test-2.1-SNAPSHOT.jar``.
Add this dependency::
"com.typesafe.akka" % "akka-mailboxes-common-test" % "2.1-SNAPSHOT"
For more inspiration you can look at the old implementations based on Redis, MongoDB, Beanstalk,
and ZooKeeper, which can be found in Akka git repository tag
`v2.0.1 <https://github.com/akka/akka/tree/v2.0.1/akka-durable-mailboxes>`_.

View file

@ -0,0 +1,156 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.testkit
//#testkit-usage
import scala.util.Random
import org.scalatest.BeforeAndAfterAll
import org.scalatest.WordSpec
import org.scalatest.matchers.ShouldMatchers
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.testkit.DefaultTimeout
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import akka.util.duration._
/**
* a Test to show some TestKit examples
*/
class TestKitUsageSpec
extends TestKit(ActorSystem("TestKitUsageSpec",
ConfigFactory.parseString(TestKitUsageSpec.config)))
with DefaultTimeout with ImplicitSender
with WordSpec with ShouldMatchers with BeforeAndAfterAll {
import TestKitUsageSpec._
val echoRef = system.actorOf(Props(new EchoActor))
val forwardRef = system.actorOf(Props(new ForwardingActor(testActor)))
val filterRef = system.actorOf(Props(new FilteringActor(testActor)))
val randomHead = Random.nextInt(6)
val randomTail = Random.nextInt(10)
val headList = Seq().padTo(randomHead, "0")
val tailList = Seq().padTo(randomTail, "1")
val seqRef = system.actorOf(Props(new SequencingActor(testActor, headList, tailList)))
override def afterAll {
system.shutdown()
}
"An EchoActor" should {
"Respond with the same message it receives" in {
within(500 millis) {
echoRef ! "test"
expectMsg("test")
}
}
}
"A ForwardingActor" should {
"Forward a message it receives" in {
within(500 millis) {
forwardRef ! "test"
expectMsg("test")
}
}
}
"A FilteringActor" should {
"Filter all messages, except expected messagetypes it receives" in {
var messages = Seq[String]()
within(500 millis) {
filterRef ! "test"
expectMsg("test")
filterRef ! 1
expectNoMsg
filterRef ! "some"
filterRef ! "more"
filterRef ! 1
filterRef ! "text"
filterRef ! 1
receiveWhile(500 millis) {
case msg: String messages = msg +: messages
}
}
messages.length should be(3)
messages.reverse should be(Seq("some", "more", "text"))
}
}
"A SequencingActor" should {
"receive an interesting message at some point " in {
within(500 millis) {
ignoreMsg {
case msg: String msg != "something"
}
seqRef ! "something"
expectMsg("something")
ignoreMsg {
case msg: String msg == "1"
}
expectNoMsg
ignoreNoMsg
}
}
}
}
object TestKitUsageSpec {
// Define your test specific configuration here
val config = """
akka {
loglevel = "WARNING"
}
"""
/**
* An Actor that echoes everything you send to it
*/
class EchoActor extends Actor {
def receive = {
case msg sender ! msg
}
}
/**
* An Actor that forwards every message to a next Actor
*/
class ForwardingActor(next: ActorRef) extends Actor {
def receive = {
case msg next ! msg
}
}
/**
* An Actor that only forwards certain messages to a next Actor
*/
class FilteringActor(next: ActorRef) extends Actor {
def receive = {
case msg: String next ! msg
case _ None
}
}
/**
* An actor that sends a sequence of messages with a random head list, an
* interesting value and a random tail list. The idea is that you would
* like to test that the interesting value is received and that you cant
* be bothered with the rest
*/
class SequencingActor(next: ActorRef, head: Seq[String], tail: Seq[String])
extends Actor {
def receive = {
case msg {
head foreach { next ! _ }
next ! msg
tail foreach { next ! _ }
}
}
}
}
//#testkit-usage

View file

@ -1,5 +1,5 @@
.. _microkernel:
.. _microkernel-scala:
Microkernel (Scala)
===================

View file

@ -194,6 +194,8 @@ is a whole set of examination methods, e.g. receiving all consecutive messages
matching certain criteria, receiving a whole sequence of fixed messages or
classes, receiving nothing for some time, etc.
The ActorSystem passed in to the constructor of TestKit is accessible with
the the :obj:`system` member.
Remember to shut down the actor system after the test is finished (also in case
of failure) so that all actors—including the test actor—are stopped.

View file

@ -6,142 +6,5 @@ TestKit Example (Scala)
Ray Roestenburg's example code from `his blog <http://roestenburg.agilesquad.com/2011/02/unit-testing-akka-actors-with-testkit_12.html>`_ adapted to work with Akka 2.x.
.. code-block:: scala
.. includecode:: code/akka/docs/testkit/TestkitUsageSpec.scala#testkit-usage
package unit.akka
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.{WordSpec, BeforeAndAfterAll}
import akka.actor.Actor._
import akka.util.duration._
import akka.testkit.TestKit
import java.util.concurrent.TimeUnit
import akka.actor.{ActorRef, Actor}
import util.Random
/**
* a Test to show some TestKit examples
*/
class TestKitUsageSpec extends WordSpec with BeforeAndAfterAll with ShouldMatchers with TestKit {
val system = ActorSystem()
import system._
val echoRef = actorOf(Props(new EchoActor))
val forwardRef = actorOf(Props(new ForwardingActor(testActor)))
val filterRef = actorOf(Props(new FilteringActor(testActor)))
val randomHead = Random.nextInt(6)
val randomTail = Random.nextInt(10)
val headList = List().padTo(randomHead, "0")
val tailList = List().padTo(randomTail, "1")
val seqRef = actorOf(Props(new SequencingActor(testActor, headList, tailList)))
override protected def afterAll(): scala.Unit = {
stopTestActor
echoRef.stop()
forwardRef.stop()
filterRef.stop()
seqRef.stop()
}
"An EchoActor" should {
"Respond with the same message it receives" in {
within(100 millis) {
echoRef ! "test"
expectMsg("test")
}
}
}
"A ForwardingActor" should {
"Forward a message it receives" in {
within(100 millis) {
forwardRef ! "test"
expectMsg("test")
}
}
}
"A FilteringActor" should {
"Filter all messages, except expected messagetypes it receives" in {
var messages = List[String]()
within(100 millis) {
filterRef ! "test"
expectMsg("test")
filterRef ! 1
expectNoMsg
filterRef ! "some"
filterRef ! "more"
filterRef ! 1
filterRef ! "text"
filterRef ! 1
receiveWhile(500 millis) {
case msg: String => messages = msg :: messages
}
}
messages.length should be(3)
messages.reverse should be(List("some", "more", "text"))
}
}
"A SequencingActor" should {
"receive an interesting message at some point " in {
within(100 millis) {
seqRef ! "something"
ignoreMsg {
case msg: String => msg != "something"
}
expectMsg("something")
ignoreMsg {
case msg: String => msg == "1"
}
expectNoMsg
}
}
}
}
/**
* An Actor that echoes everything you send to it
*/
class EchoActor extends Actor {
def receive = {
case msg => {
self.reply(msg)
}
}
}
/**
* An Actor that forwards every message to a next Actor
*/
class ForwardingActor(next: ActorRef) extends Actor {
def receive = {
case msg => {
next ! msg
}
}
}
/**
* An Actor that only forwards certain messages to a next Actor
*/
class FilteringActor(next: ActorRef) extends Actor {
def receive = {
case msg: String => {
next ! msg
}
case _ => None
}
}
/**
* An actor that sends a sequence of messages with a random head list, an interesting value and a random tail list
* The idea is that you would like to test that the interesting value is received and that you cant be bothered with the rest
*/
class SequencingActor(next: ActorRef, head: List[String], tail: List[String]) extends Actor {
def receive = {
case msg => {
head map (next ! _)
next ! msg
tail map (next ! _)
}
}
}

View file

@ -25,19 +25,17 @@ class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSp
}
}
def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue]
def clean {
def clean() {
FileUtils.deleteDirectory(new java.io.File(queuePath))
}
override def atStartup() {
clean
clean()
super.atStartup()
}
override def atTermination() {
clean
clean()
super.atTermination()
}
}

View file

@ -1,30 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
option java_package = "akka.actor.mailbox";
option optimize_for = SPEED;
/******************************************
Compile with:
cd ./akka-durable-mailboxes/akka-mailboxes-common/src/main/protocol
protoc MailboxProtocol.proto --java_out ../java
*******************************************/
/**
* Defines the durable mailbox message.
*/
message DurableMailboxMessageProtocol {
required string ownerAddress = 1;
optional string senderAddress = 2;
optional UuidProtocol futureUuid = 3;
required bytes message = 4;
}
/**
* Defines a UUID.
*/
message UuidProtocol {
required uint64 high = 1;
required uint64 low = 2;
}

View file

@ -28,6 +28,9 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒
def serialize(durableMessage: Envelope): Array[Byte] = {
// It's alright to use ref.path.toString here
// When the sender is a LocalActorRef it should be local when deserialized also.
// When the sender is a RemoteActorRef the path.toString already contains remote address information.
def serializeActorRef(ref: ActorRef): ActorRefProtocol = ActorRefProtocol.newBuilder.setPath(ref.path.toString).build
val message = MessageSerializer.serialize(system, durableMessage.message.asInstanceOf[AnyRef])

View file

@ -3,14 +3,25 @@
*/
package akka.actor.mailbox
import akka.testkit.AkkaSpec
import akka.testkit.TestLatch
import akka.util.duration._
import java.io.InputStream
import scala.annotation.tailrec
import DurableMailboxSpecActorFactory.AccumulatorActor
import DurableMailboxSpecActorFactory.MailboxTestActor
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.LocalActorRef
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.dispatch.Mailbox
import akka.testkit.TestKit
import akka.util.duration.intToDurationInt
import com.typesafe.config.Config
import akka.actor._
import akka.dispatch.{ Mailbox, Await }
import com.typesafe.config.ConfigFactory
import java.io.InputStream
import java.util.concurrent.TimeoutException
import org.scalatest.BeforeAndAfterAll
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import scala.annotation.tailrec
object DurableMailboxSpecActorFactory {
@ -28,13 +39,62 @@ object DurableMailboxSpecActorFactory {
}
object DurableMailboxSpec {
def fallbackConfig: Config = ConfigFactory.parseString("""
akka {
event-handlers = ["akka.testkit.TestEventListener"]
loglevel = "WARNING"
stdout-loglevel = "WARNING"
}
""")
}
/**
* Reusable test fixture for durable mailboxes. Implements a few basic tests. More
* tests can be added in concrete subclass.
*
* Subclass must define dispatcher in the supplied config for the specific backend.
* The id of the dispatcher must be the same as the `<backendName>-dispatcher`.
*/
abstract class DurableMailboxSpec(val backendName: String, config: String) extends AkkaSpec(config) {
abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String)
extends TestKit(system) with WordSpec with MustMatchers with BeforeAndAfterAll {
import DurableMailboxSpecActorFactory._
/**
* Subclass must define dispatcher in the supplied config for the specific backend.
* The id of the dispatcher must be the same as the `<backendName>-dispatcher`.
*/
def this(backendName: String, config: String) = {
this(ActorSystem(backendName + "BasedDurableMailboxSpec",
ConfigFactory.parseString(config).withFallback(DurableMailboxSpec.fallbackConfig)),
backendName)
}
final override def beforeAll {
atStartup()
}
/**
* May be implemented in concrete subclass to do additional things once before test
* cases are run.
*/
protected def atStartup() {}
final override def afterAll {
system.shutdown()
try system.awaitTermination(5 seconds) catch {
case _: TimeoutException system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
}
atTermination()
}
/**
* May be implemented in concrete subclass to do additional things once after all
* test cases have been run.
*/
def atTermination() {}
protected def streamMustContain(in: InputStream, words: String): Unit = {
val output = new Array[Byte](8192)
@ -60,7 +120,8 @@ abstract class DurableMailboxSpec(val backendName: String, config: String) exten
case some system.actorOf(props.withDispatcher(backendName + "-dispatcher"), some)
}
def isDurableMailbox(m: Mailbox): Boolean
private def isDurableMailbox(m: Mailbox): Boolean =
m.messageQueue.isInstanceOf[DurableMessageQueue]
"A " + backendName + " based mailbox backed actor" must {

View file

@ -3,7 +3,6 @@
# ============== Akka Cluster Administration Tool ==============
#
# This script is meant to be used from within the Akka distribution.
# Requires setting $AKKA_HOME to the root of the distribution.
#
# Add these options to the sbt or startup script:
# java \
@ -15,9 +14,12 @@
# FIXME support authentication? if so add: -Dcom.sun.management.jmxremote.password.file=<path to file> AND tweak this script to support it (arg need 'user:passwd' instead of '-')
# NOTE: The 'cmdline-jmxclient' JAR is available as part of the Akka distribution.
# Provided by Typesafe Maven Repository: http://repo.typesafe.com/typesafe/releases/cmdline-jmxclient.
JMX_CLIENT="java -jar $AKKA_HOME/lib/akka/cmdline-jmxclient-0.10.3.jar -"
declare AKKA_HOME="$(cd "$(cd "$(dirname "$0")"; pwd -P)"/..; pwd)"
[ -n "$JMX_CLIENT_CLASSPATH" ] || JMX_CLIENT_CLASSPATH="$AKKA_HOME/lib/akka/akka-kernel-*"
# NOTE: The 'cmdline-jmxclient' is available as part of the Akka distribution.
JMX_CLIENT="java -cp $JMX_CLIENT_CLASSPATH akka.jmx.Client -"
SELF=`basename $0` # script name
HOST=$1 # cluster node:port to talk to through JMX
@ -168,7 +170,7 @@ case "$2" in
;;
*)
printf "Usage: $SELF <node-hostname:jmx-port> <command> ...\n"
printf "Usage: bin/$SELF <node-hostname:jmx-port> <command> ...\n"
printf "\n"
printf "Supported commands are:\n"
printf "%26s - %s\n" "join <actor-system-url>" "Sends request a JOIN node with the specified URL"
@ -183,9 +185,9 @@ case "$2" in
printf "%26s - %s\n" has-convergence "Checks if there is a cluster convergence"
printf "Where the <actor-system-url> should be on the format of 'akka://actor-system-name@hostname:port'\n"
printf "\n"
printf "Examples: $SELF localhost:9999 is-available\n"
printf " $SELF localhost:9999 join akka://MySystem@darkstar:2552\n"
printf " $SELF localhost:9999 cluster-status\n"
printf "Examples: bin/$SELF localhost:9999 is-available\n"
printf " bin/$SELF localhost:9999 join akka://MySystem@darkstar:2552\n"
printf " bin/$SELF localhost:9999 cluster-status\n"
exit 1
;;
esac

View file

@ -0,0 +1,781 @@
/*
* Client
*
* $Id$
*
* Created on Nov 12, 2004
*
* Copyright (C) 2004 Internet Archive.
*
* This file is part of the Heritrix web crawler (crawler.archive.org).
*
* Heritrix is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* any later version.
*
* Heritrix is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser Public License
* along with Heritrix; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package akka.jmx;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.FieldPosition;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.InstanceNotFoundException;
import javax.management.IntrospectionException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanFeatureInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanOperationInfo;
import javax.management.MBeanParameterInfo;
import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
/**
* A Simple Command-Line JMX Client.
* Tested against the JDK 1.5.0 JMX Agent.
* See <a href="http://java.sun.com/j2se/1.5.0/docs/guide/management/agent.html">Monitoring
* and Management Using JMX</a>.
* <p>Can supply credentials and do primitive string representation of tabular
* and composite openmbeans.
* @author stack
*/
public class Client {
private static final Logger logger =
Logger.getLogger(Client.class.getName());
/**
* Usage string.
*/
private static final String USAGE = "Usage: java -jar" +
" cmdline-jmxclient.jar USER:PASS HOST:PORT [BEAN] [COMMAND]\n" +
"Options:\n" +
" USER:PASS Username and password. Required. If none, pass '-'.\n" +
" E.g. 'controlRole:secret'\n" +
" HOST:PORT Hostname and port to connect to. Required." +
" E.g. localhost:8081.\n" +
" Lists registered beans if only USER:PASS and this" +
" argument.\n" +
" BEAN Optional target bean name. If present we list" +
" available operations\n" +
" and attributes.\n" +
" COMMAND Optional operation to run or attribute to fetch. If" +
" none supplied,\n" +
" all operations and attributes are listed. Attributes" +
" begin with a\n" +
" capital letter: e.g. 'Status' or 'Started'." +
" Operations do not.\n" +
" Operations can take arguments by adding an '=' " +
"followed by\n" +
" comma-delimited params. Pass multiple " +
"attributes/operations to run\n" +
" more than one per invocation. Use commands 'create' and " +
"'destroy'\n" +
" to instantiate and unregister beans ('create' takes name " +
"of class).\n" +
" Pass 'Attributes' to get listing of all attributes and " +
"and their\n" +
" values.\n" +
"Requirements:\n" +
" JDK1.5.0. If connecting to a SUN 1.5.0 JDK JMX Agent, remote side" +
" must be\n" +
" started with system properties such as the following:\n" +
" -Dcom.sun.management.jmxremote.port=PORT\n" +
" -Dcom.sun.management.jmxremote.authenticate=false\n" +
" -Dcom.sun.management.jmxremote.ssl=false\n" +
" The above will start the remote server with no password. See\n" +
" http://java.sun.com/j2se/1.5.0/docs/guide/management/agent.html" +
" for more on\n" +
" 'Monitoring and Management via JMX'.\n" +
"Client Use Examples:\n" +
" To list MBeans on a non-password protected remote agent:\n" +
" % java -jar cmdline-jmxclient-X.X.jar - localhost:8081 \\\n" +
" org.archive.crawler:name=Heritrix,type=Service\n" +
" To list attributes and attributes of the Heritrix MBean:\n" +
" % java -jar cmdline-jmxclient-X.X.jar - localhost:8081 \\\n" +
" org.archive.crawler:name=Heritrix,type=Service \\\n" +
" schedule=http://www.archive.org\n" +
" To set set logging level to FINE on a password protected JVM:\n" +
" % java -jar cmdline-jmxclient-X.X.jar controlRole:secret" +
" localhost:8081 \\\n" +
" java.util.logging:type=Logging \\\n" +
" setLoggerLevel=org.archive.crawler.Heritrix,FINE";
/**
* Pattern that matches a command name followed by
* an optional equals and optional comma-delimited list
* of arguments.
*/
protected static final Pattern CMD_LINE_ARGS_PATTERN =
Pattern.compile("^([^=]+)(?:(?:\\=)(.+))?$");
private static final String CREATE_CMD_PREFIX = "create=";
public static void main(String[] args) throws Exception {
Client client = new Client();
// Set the logger to use our all-on-one-line formatter.
Logger l = Logger.getLogger("");
Handler [] hs = l.getHandlers();
for (int i = 0; i < hs.length; i++) {
Handler h = hs[0];
if (h instanceof ConsoleHandler) {
h.setFormatter(client.new OneLineSimpleLogger());
}
}
client.execute(args);
}
protected static void usage() {
usage(0, null);
}
protected static void usage(int exitCode, String message) {
if (message != null && message.length() > 0) {
System.out.println(message);
}
System.out.println(USAGE);
System.exit(exitCode);
}
/**
* Constructor.
*/
public Client() {
super();
}
/**
* Parse a 'login:password' string. Assumption is that no
* colon in the login name.
* @param userpass
* @return Array of strings with login in first position.
*/
protected String [] parseUserpass(final String userpass) {
if (userpass == null || userpass.equals("-")) {
return null;
}
int index = userpass.indexOf(':');
if (index <= 0) {
throw new RuntimeException("Unable to parse: " +userpass);
}
return new String [] {userpass.substring(0, index),
userpass.substring(index + 1)};
}
/**
* @param login
* @param password
* @return Credentials as map for RMI.
*/
protected Map formatCredentials(final String login,
final String password) {
Map env = null;
String[] creds = new String[] {login, password};
env = new HashMap(1);
env.put(JMXConnector.CREDENTIALS, creds);
return env;
}
protected JMXConnector getJMXConnector(final String hostport,
final String login, final String password)
throws IOException {
// Make up the jmx rmi URL and get a connector.
JMXServiceURL rmiurl = new JMXServiceURL("service:jmx:rmi://"
+ hostport + "/jndi/rmi://" + hostport + "/jmxrmi");
return JMXConnectorFactory.connect(rmiurl,
formatCredentials(login, password));
}
protected ObjectName getObjectName(final String beanname)
throws MalformedObjectNameException, NullPointerException {
return notEmpty(beanname)? new ObjectName(beanname): null;
}
/**
* Version of execute called from the cmdline.
* Prints out result of execution on stdout.
* Parses cmdline args. Then calls {@link #execute(String, String,
* String, String, String[], boolean)}.
* @param args Cmdline args.
* @throws Exception
*/
protected void execute(final String [] args)
throws Exception {
// Process command-line.
if (args.length == 0 || args.length == 1) {
usage();
}
String userpass = args[0];
String hostport = args[1];
String beanname = null;
String [] command = null;
if (args.length > 2) {
beanname = args[2];
}
if (args.length > 3) {
command = new String [args.length - 3];
for (int i = 3; i < args.length; i++) {
command[i - 3] = args[i];
}
}
String [] loginPassword = parseUserpass(userpass);
Object [] result = execute(hostport,
((loginPassword == null)? null: loginPassword[0]),
((loginPassword == null)? null: loginPassword[1]), beanname,
command);
// Print out results on stdout. Only log if a result.
if (result != null) {
for (int i = 0; i < result.length; i++) {
if (result[i] != null && result[i].toString().length() > 0) {
if (command != null) {
logger.info(command[i] + ": " + result[i]);
} else {
logger.info("\n" + result[i].toString());
}
}
}
}
}
protected Object [] execute(final String hostport, final String login,
final String password, final String beanname,
final String [] command)
throws Exception {
return execute(hostport, login, password, beanname, command, false);
}
public Object [] executeOneCmd(final String hostport, final String login,
final String password, final String beanname,
final String command)
throws Exception {
return execute(hostport, login, password, beanname,
new String[] {command}, true);
}
/**
* Execute command against remote JMX agent.
* @param hostport 'host:port' combination.
* @param login RMI login to use.
* @param password RMI password to use.
* @param beanname Name of remote bean to run command against.
* @param command Array of commands to run.
* @param oneBeanOnly Set true if passed <code>beanname</code> is
* an exact name and the query for a bean is only supposed to return
* one bean instance. If not, we raise an exception (Otherwise, if false,
* then we deal with possibility of multiple bean instances coming back
* from query). Set to true when want to get an attribute or run an
* operation.
* @return Array of results -- one per command.
* @throws Exception
*/
protected Object [] execute(final String hostport, final String login,
final String password, final String beanname,
final String [] command, final boolean oneBeanOnly)
throws Exception {
JMXConnector jmxc = getJMXConnector(hostport, login, password);
Object [] result = null;
try {
result = doBeans(jmxc.getMBeanServerConnection(),
getObjectName(beanname), command, oneBeanOnly);
} finally {
jmxc.close();
}
return result;
}
protected boolean notEmpty(String s) {
return s != null && s.length() > 0;
}
protected Object [] doBeans(final MBeanServerConnection mbsc,
final ObjectName objName, final String[] command,
final boolean oneBeanOnly)
throws Exception {
Object [] result = null;
Set beans = mbsc.queryMBeans(objName, null);
if (beans.size() == 0) {
// No bean found. Check if we are to create a bean?
if (command.length == 1 && notEmpty(command[0])
&& command[0].startsWith(CREATE_CMD_PREFIX)) {
String className =
command[0].substring(CREATE_CMD_PREFIX.length());
mbsc.createMBean(className, objName);
} else {
// TODO: Is there a better JMX exception that RE for this
// scenario?
throw new RuntimeException(objName.getCanonicalName() +
" not registered.");
}
} else if (beans.size() == 1) {
result = doBean(mbsc, (ObjectInstance) beans.iterator().next(),
command);
} else {
if (oneBeanOnly) {
throw new RuntimeException("Only supposed to be one bean " +
"query result");
}
// This is case of multiple beans in query results.
// Print name of each into a StringBuffer. Return as one
// result.
StringBuffer buffer = new StringBuffer();
for (Iterator i = beans.iterator(); i.hasNext();) {
Object obj = i.next();
if (obj instanceof ObjectName) {
buffer.append((((ObjectName) obj).getCanonicalName()));
} else if (obj instanceof ObjectInstance) {
buffer.append((((ObjectInstance) obj).getObjectName()
.getCanonicalName()));
} else {
throw new RuntimeException("Unexpected object type: " + obj);
}
buffer.append("\n");
}
result = new String [] {buffer.toString()};
}
return result;
}
/**
* Get attribute or run operation against passed bean <code>instance</code>.
*
* @param mbsc Server connection.
* @param instance Bean instance we're to get attributes from or run
* operation against.
* @param command Command to run (May be null).
* @return Result. If multiple commands, multiple results.
* @throws Exception
*/
protected Object [] doBean(MBeanServerConnection mbsc,
ObjectInstance instance, String [] command)
throws Exception {
// If no command, then print out list of attributes and operations.
if (command == null || command.length <= 0) {
return new String [] {listOptions(mbsc, instance)};
}
// Maybe multiple attributes/operations listed on one command line.
Object [] result = new Object[command.length];
for (int i = 0; i < command.length; i++) {
result[i] = doSubCommand(mbsc, instance, command[i]);
}
return result;
}
public Object doSubCommand(MBeanServerConnection mbsc,
ObjectInstance instance, String subCommand)
throws Exception {
// First, handle special case of our being asked to destroy a bean.
if (subCommand.equals("destroy")) {
mbsc.unregisterMBean(instance.getObjectName());
return null;
} else if (subCommand.startsWith(CREATE_CMD_PREFIX)) {
throw new IllegalArgumentException("You cannot call create " +
"on an already existing bean.");
}
// Get attribute and operation info.
MBeanAttributeInfo [] attributeInfo =
mbsc.getMBeanInfo(instance.getObjectName()).getAttributes();
MBeanOperationInfo [] operationInfo =
mbsc.getMBeanInfo(instance.getObjectName()).getOperations();
// Now, bdbje JMX bean doesn't follow the convention of attributes
// having uppercase first letter and operations having lowercase
// first letter. But most beans do. Be prepared to handle the bdbje
// case.
Object result = null;
if (Character.isUpperCase(subCommand.charAt(0))) {
// Probably an attribute.
if (!isFeatureInfo(attributeInfo, subCommand) &&
isFeatureInfo(operationInfo, subCommand)) {
// Its not an attribute name. Looks like its name of an
// operation. Try it.
result =
doBeanOperation(mbsc, instance, subCommand, operationInfo);
} else {
// Then it is an attribute OR its not an attribute name nor
// operation name and the below invocation will throw a
// AttributeNotFoundException.
result = doAttributeOperation(mbsc, instance, subCommand,
attributeInfo);
}
} else {
// Must be an operation.
if (!isFeatureInfo(operationInfo, subCommand) &&
isFeatureInfo(attributeInfo, subCommand)) {
// Its not an operation name but looks like it could be an
// attribute name. Try it.
result = doAttributeOperation(mbsc, instance, subCommand,
attributeInfo);
} else {
// Its an operation name OR its neither operation nor attribute
// name and the below will throw a NoSuchMethodException.
result =
doBeanOperation(mbsc, instance, subCommand, operationInfo);
}
}
// Look at the result. Is it of composite or tabular type?
// If so, convert to a String representation.
if (result instanceof CompositeData) {
result = recurseCompositeData(new StringBuffer("\n"), "", "",
(CompositeData)result);
} else if (result instanceof TabularData) {
result = recurseTabularData(new StringBuffer("\n"), "", "",
(TabularData)result);
} else if (result instanceof String []) {
String [] strs = (String [])result;
StringBuffer buffer = new StringBuffer("\n");
for (int i = 0; i < strs.length; i++) {
buffer.append(strs[i]);
buffer.append("\n");
}
result = buffer;
} else if (result instanceof AttributeList) {
AttributeList list = (AttributeList)result;
if (list.size() <= 0) {
result = null;
} else {
StringBuffer buffer = new StringBuffer("\n");
for (Iterator ii = list.iterator(); ii.hasNext();) {
Attribute a = (Attribute)ii.next();
buffer.append(a.getName());
buffer.append(": ");
buffer.append(a.getValue());
buffer.append("\n");
}
result = buffer;
}
}
return result;
}
protected boolean isFeatureInfo(MBeanFeatureInfo [] infos, String cmd) {
return getFeatureInfo(infos, cmd) != null;
}
protected MBeanFeatureInfo getFeatureInfo(MBeanFeatureInfo [] infos,
String cmd) {
// Cmd may be carrying arguments. Don't count them in the compare.
int index = cmd.indexOf('=');
String name = (index > 0)? cmd.substring(0, index): cmd;
for (int i = 0; i < infos.length; i++) {
if (infos[i].getName().equals(name)) {
return infos[i];
}
}
return null;
}
protected StringBuffer recurseTabularData(StringBuffer buffer,
String indent, String name, TabularData data) {
addNameToBuffer(buffer, indent, name);
java.util.Collection c = data.values();
for (Iterator i = c.iterator(); i.hasNext();) {
Object obj = i.next();
if (obj instanceof CompositeData) {
recurseCompositeData(buffer, indent + " ", "",
(CompositeData)obj);
} else if (obj instanceof TabularData) {
recurseTabularData(buffer, indent, "",
(TabularData)obj);
} else {
buffer.append(obj);
}
}
return buffer;
}
protected StringBuffer recurseCompositeData(StringBuffer buffer,
String indent, String name, CompositeData data) {
indent = addNameToBuffer(buffer, indent, name);
for (Iterator i = data.getCompositeType().keySet().iterator();
i.hasNext();) {
String key = (String)i.next();
Object o = data.get(key);
if (o instanceof CompositeData) {
recurseCompositeData(buffer, indent + " ", key,
(CompositeData)o);
} else if (o instanceof TabularData) {
recurseTabularData(buffer, indent, key, (TabularData)o);
} else {
buffer.append(indent);
buffer.append(key);
buffer.append(": ");
buffer.append(o);
buffer.append("\n");
}
}
return buffer;
}
protected String addNameToBuffer(StringBuffer buffer, String indent,
String name) {
if (name == null || name.length() == 0) {
return indent;
}
buffer.append(indent);
buffer.append(name);
buffer.append(":\n");
// Move all that comes under this 'name' over by one space.
return indent + " ";
}
/**
* Class that parses commandline arguments.
* Expected format is 'operationName=arg0,arg1,arg2...'. We are assuming no
* spaces nor comma's in argument values.
*/
protected class CommandParse {
private String cmd;
private String [] args;
protected CommandParse(String command) throws ParseException {
parse(command);
}
private void parse(String command) throws ParseException {
Matcher m = CMD_LINE_ARGS_PATTERN.matcher(command);
if (m == null || !m.matches()) {
throw new ParseException("Failed parse of " + command, 0);
}
this.cmd = m.group(1);
if (m.group(2) != null && m.group(2).length() > 0) {
this.args = m.group(2).split(",");
} else {
this.args = null;
}
}
protected String getCmd() {
return this.cmd;
}
protected String [] getArgs() {
return this.args;
}
}
protected Object doAttributeOperation(MBeanServerConnection mbsc,
ObjectInstance instance, String command, MBeanAttributeInfo [] infos)
throws Exception {
// Usually we get attributes. If an argument, then we're being asked
// to set attribute.
CommandParse parse = new CommandParse(command);
if (parse.getArgs() == null || parse.getArgs().length == 0) {
// Special-casing. If the subCommand is 'Attributes', then return
// list of all attributes.
if (command.equals("Attributes")) {
String [] names = new String[infos.length];
for (int i = 0; i < infos.length; i++) {
names[i] = infos[i].getName();
}
return mbsc.getAttributes(instance.getObjectName(), names);
}
return mbsc.getAttribute(instance.getObjectName(), parse.getCmd());
}
if (parse.getArgs().length != 1) {
throw new IllegalArgumentException("One only argument setting " +
"attribute values: " + parse.getArgs());
}
// Get first attribute of name 'cmd'. Assumption is no method
// overrides. Then, look at the attribute and use its type.
MBeanAttributeInfo info =
(MBeanAttributeInfo)getFeatureInfo(infos, parse.getCmd());
java.lang.reflect.Constructor c = Class.forName(
info.getType()).getConstructor(new Class[] {String.class});
Attribute a = new Attribute(parse.getCmd(),
c.newInstance(new Object[] {parse.getArgs()[0]}));
mbsc.setAttribute(instance.getObjectName(), a);
return null;
}
protected Object doBeanOperation(MBeanServerConnection mbsc,
ObjectInstance instance, String command, MBeanOperationInfo [] infos)
throws Exception {
// Parse command line.
CommandParse parse = new CommandParse(command);
// Get first method of name 'cmd'. Assumption is no method
// overrides. Then, look at the method and use its signature
// to make sure client sends over parameters of the correct type.
MBeanOperationInfo op =
(MBeanOperationInfo)getFeatureInfo(infos, parse.getCmd());
Object result = null;
if (op == null) {
result = "Operation " + parse.getCmd() + " not found.";
} else {
MBeanParameterInfo [] paraminfos = op.getSignature();
int paraminfosLength = (paraminfos == null)? 0: paraminfos.length;
int objsLength = (parse.getArgs() == null)?
0: parse.getArgs().length;
if (paraminfosLength != objsLength) {
result = "Passed param count does not match signature count";
} else {
String [] signature = new String[paraminfosLength];
Object [] params = (paraminfosLength == 0)? null
: new Object[paraminfosLength];
for (int i = 0; i < paraminfosLength; i++) {
MBeanParameterInfo paraminfo = paraminfos[i];
java.lang.reflect.Constructor c = Class.forName(
paraminfo.getType()).getConstructor(
new Class[] {String.class});
params[i] =
c.newInstance(new Object[] {parse.getArgs()[i]});
signature[i] = paraminfo.getType();
}
result = mbsc.invoke(instance.getObjectName(), parse.getCmd(),
params, signature);
}
}
return result;
}
protected String listOptions(MBeanServerConnection mbsc,
ObjectInstance instance)
throws InstanceNotFoundException, IntrospectionException,
ReflectionException, IOException {
StringBuffer result = new StringBuffer();
MBeanInfo info = mbsc.getMBeanInfo(instance.getObjectName());
MBeanAttributeInfo [] attributes = info.getAttributes();
if (attributes.length > 0) {
result.append("Attributes:");
result.append("\n");
for (int i = 0; i < attributes.length; i++) {
result.append(' ' + attributes[i].getName() +
": " + attributes[i].getDescription() +
" (type=" + attributes[i].getType() +
")");
result.append("\n");
}
}
MBeanOperationInfo [] operations = info.getOperations();
if (operations.length > 0) {
result.append("Operations:");
result.append("\n");
for (int i = 0; i < operations.length; i++) {
MBeanParameterInfo [] params = operations[i].getSignature();
StringBuffer paramsStrBuffer = new StringBuffer();
if (params != null) {
for (int j = 0; j < params.length; j++) {
paramsStrBuffer.append("\n name=");
paramsStrBuffer.append(params[j].getName());
paramsStrBuffer.append(" type=");
paramsStrBuffer.append(params[j].getType());
paramsStrBuffer.append(" ");
paramsStrBuffer.append(params[j].getDescription());
}
}
result.append(' ' + operations[i].getName() +
": " + operations[i].getDescription() +
"\n Parameters " + params.length +
", return type=" + operations[i].getReturnType() +
paramsStrBuffer.toString());
result.append("\n");
}
}
return result.toString();
}
/**
* Logger that writes entry on one line with less verbose date.
* Modelled on the OneLineSimpleLogger from Heritrix.
*
* @author stack
* @version $Revision$, $Date$
*/
private class OneLineSimpleLogger extends SimpleFormatter {
/**
* Date instance.
*
* Keep around instance of date.
*/
private Date date = new Date();
/**
* Field position instance.
*
* Keep around this instance.
*/
private FieldPosition position = new FieldPosition(0);
/**
* MessageFormatter for date.
*/
private SimpleDateFormat formatter =
new SimpleDateFormat("MM/dd/yyyy HH:mm:ss Z");
/**
* Persistent buffer in which we conjure the log.
*/
private StringBuffer buffer = new StringBuffer();
public OneLineSimpleLogger() {
super();
}
public synchronized String format(LogRecord record) {
this.buffer.setLength(0);
this.date.setTime(record.getMillis());
this.position.setBeginIndex(0);
this.formatter.format(this.date, this.buffer, this.position);
this.buffer.append(' ');
if (record.getSourceClassName() != null) {
this.buffer.append(record.getSourceClassName());
} else {
this.buffer.append(record.getLoggerName());
}
this.buffer.append(' ');
this.buffer.append(formatMessage(record));
this.buffer.append(System.getProperty("line.separator"));
if (record.getThrown() != null) {
try {
StringWriter writer = new StringWriter();
PrintWriter printer = new PrintWriter(writer);
record.getThrown().printStackTrace(printer);
writer.close();
this.buffer.append(writer.toString());
} catch (Exception e) {
this.buffer.append("Failed to get stack trace: " +
e.getMessage());
}
}
return this.buffer.toString();
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -79,10 +79,40 @@ message AddressProtocol {
}
/**
* Defines the durable mailbox message.
* Defines akka.remote.DaemonMsgCreate
*/
message DurableMailboxMessageProtocol {
required ActorRefProtocol recipient= 1;
optional ActorRefProtocol sender = 2;
required bytes message = 3;
message DaemonMsgCreateProtocol {
required PropsProtocol props = 1;
required DeployProtocol deploy = 2;
required string path = 3;
required ActorRefProtocol supervisor = 4;
}
/**
* Serialization of akka.actor.Props
*/
message PropsProtocol {
required string dispatcher = 1;
required DeployProtocol deploy = 2;
optional string fromClassCreator = 3;
optional bytes creator = 4;
optional bytes routerConfig = 5;
}
/**
* Serialization of akka.actor.Deploy
*/
message DeployProtocol {
required string path = 1;
optional bytes config = 2;
optional bytes routerConfig = 3;
optional bytes scope = 4;
}
/**
* Serialization of akka.remote.DaemonMsgWatch
*/
message DaemonMsgWatchProtocol {
required ActorRefProtocol watcher = 1;
required ActorRefProtocol watched = 2;
}

View file

@ -14,6 +14,8 @@ akka {
serializers {
proto = "akka.serialization.ProtobufSerializer"
daemon-create = "akka.serialization.DaemonMsgCreateSerializer"
daemon-watch = "akka.serialization.DaemonMsgWatchSerializer"
}
@ -21,6 +23,8 @@ akka {
# Since com.google.protobuf.Message does not extend Serializable but GeneratedMessage
# does, need to use the more specific one here in order to avoid ambiguity
"com.google.protobuf.GeneratedMessage" = proto
"akka.remote.DaemonMsgCreate" = daemon-create
"akka.remote.DaemonMsgWatch" = daemon-watch
}
deployment {

View file

@ -0,0 +1,152 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import java.io.Serializable
import com.google.protobuf.ByteString
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Deploy
import akka.actor.ExtendedActorSystem
import akka.actor.NoScopeGiven
import akka.actor.Props
import akka.actor.Scope
import akka.remote.DaemonMsgCreate
import akka.remote.RemoteProtocol.ActorRefProtocol
import akka.remote.RemoteProtocol.DaemonMsgCreateProtocol
import akka.remote.RemoteProtocol.DeployProtocol
import akka.remote.RemoteProtocol.PropsProtocol
import akka.routing.NoRouter
import akka.routing.RouterConfig
import akka.actor.FromClassCreator
/**
* Serializes akka's internal DaemonMsgCreate using protobuf
* for the core structure of DaemonMsgCreate, Props and Deploy.
* Serialization of contained RouterConfig, Config, and Scope
* is done with configured serializer for those classes, by
* default java.io.Serializable.
*/
class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) extends Serializer {
import ProtobufSerializer.serializeActorRef
import ProtobufSerializer.deserializeActorRef
def includeManifest: Boolean = false
def identifier = 3
lazy val serialization = SerializationExtension(system)
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case DaemonMsgCreate(props, deploy, path, supervisor)
def deployProto(d: Deploy): DeployProtocol = {
val builder = DeployProtocol.newBuilder.setPath(d.path)
if (d.config != ConfigFactory.empty)
builder.setConfig(serialize(d.config))
if (d.routerConfig != NoRouter)
builder.setRouterConfig(serialize(d.routerConfig))
if (d.scope != NoScopeGiven)
builder.setScope(serialize(d.scope))
builder.build
}
def propsProto = {
val builder = PropsProtocol.newBuilder.
setDispatcher(props.dispatcher).
setDeploy(deployProto(props.deploy))
props.creator match {
case FromClassCreator(clazz) builder.setFromClassCreator(clazz.getName)
case creator builder.setCreator(serialize(creator))
}
if (props.routerConfig != NoRouter)
builder.setRouterConfig(serialize(props.routerConfig))
builder.build
}
DaemonMsgCreateProtocol.newBuilder.
setProps(propsProto).
setDeploy(deployProto(deploy)).
setPath(path).
setSupervisor(serializeActorRef(supervisor)).
build.toByteArray
case _
throw new IllegalArgumentException(
"Can't serialize a non-DaemonMsgCreate message using DaemonMsgCreateSerializer [%s]".format(obj))
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val proto = DaemonMsgCreateProtocol.parseFrom(bytes)
def deploy(protoDeploy: DeployProtocol) = {
val config =
if (protoDeploy.hasConfig) deserialize(protoDeploy.getConfig, classOf[Config])
else ConfigFactory.empty
val routerConfig =
if (protoDeploy.hasRouterConfig) deserialize(protoDeploy.getRouterConfig, classOf[RouterConfig])
else NoRouter
val scope =
if (protoDeploy.hasScope) deserialize(protoDeploy.getScope, classOf[Scope])
else NoScopeGiven
Deploy(protoDeploy.getPath, config, routerConfig, scope)
}
def props = {
val creator =
if (proto.getProps.hasFromClassCreator) {
system.dynamicAccess.getClassFor(proto.getProps.getFromClassCreator) match {
case Right(clazz) FromClassCreator(clazz)
case Left(e) throw e
}
} else {
deserialize(proto.getProps.getCreator, classOf[() Actor])
}
val routerConfig =
if (proto.getProps.hasRouterConfig) deserialize(proto.getProps.getRouterConfig, classOf[RouterConfig])
else NoRouter
Props(
creator = creator,
dispatcher = proto.getProps.getDispatcher,
routerConfig = routerConfig,
deploy = deploy(proto.getProps.getDeploy))
}
DaemonMsgCreate(
props = props,
deploy = deploy(proto.getDeploy),
path = proto.getPath,
supervisor = deserializeActorRef(system, proto.getSupervisor))
}
protected def serialize(any: AnyRef): ByteString =
serialization.serialize(any) match {
case Right(bytes) ByteString.copyFrom(bytes)
case Left(e) throw e
}
protected def deserialize[T: ClassManifest](data: ByteString, clazz: Class[T]): T = {
val bytes = data.toByteArray
serialization.deserialize(bytes, clazz) match {
case Right(x) if classManifest[T].erasure.isInstance(x) x.asInstanceOf[T]
case Right(other) throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".
format(clazz.getName, other))
case Left(e)
// Fallback to the java serializer, because some interfaces don't implement java.io.Serializable,
// but the impl instance does. This could be optimized by adding java serializers in reference.conf:
// com.typesafe.config.Config
// akka.routing.RouterConfig
// akka.actor.Scope
serialization.deserialize(bytes, classOf[java.io.Serializable]) match {
case Right(x) if classManifest[T].erasure.isInstance(x) x.asInstanceOf[T]
case _ throw e // the first exception
}
}
}
}

View file

@ -0,0 +1,41 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import akka.actor.ActorRef
import akka.remote.DaemonMsgWatch
import akka.remote.RemoteProtocol.ActorRefProtocol
import akka.remote.RemoteProtocol.DaemonMsgWatchProtocol
import akka.actor.ExtendedActorSystem
/**
* Serializes akka's internal DaemonMsgWatch using protobuf.
*/
class DaemonMsgWatchSerializer(val system: ExtendedActorSystem) extends Serializer {
import ProtobufSerializer.serializeActorRef
import ProtobufSerializer.deserializeActorRef
def includeManifest: Boolean = false
def identifier = 4
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case DaemonMsgWatch(watcher, watched)
DaemonMsgWatchProtocol.newBuilder.
setWatcher(serializeActorRef(watcher)).
setWatched(serializeActorRef(watched)).
build.toByteArray
case _
throw new IllegalArgumentException(
"Can't serialize a non-DaemonMsgWatch message using DaemonMsgWatchSerializer [%s]".format(obj))
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val proto = DaemonMsgWatchProtocol.parseFrom(bytes)
DaemonMsgWatch(
watcher = deserializeActorRef(system, proto.getWatcher),
watched = deserializeActorRef(system, proto.getWatched))
}
}

View file

@ -6,6 +6,32 @@ package akka.serialization
import com.google.protobuf.Message
import akka.actor.DynamicAccess
import akka.remote.RemoteProtocol.ActorRefProtocol
import akka.actor.ActorSystem
import akka.actor.ActorRef
object ProtobufSerializer {
/**
* Helper to serialize an [[akka.actor.ActorRef]] to Akka's
* protobuf representation.
*/
def serializeActorRef(ref: ActorRef): ActorRefProtocol = {
val identifier: String = Serialization.currentTransportAddress.value match {
case null ref.path.toString
case address ref.path.toStringWithAddress(address)
}
ActorRefProtocol.newBuilder.setPath(identifier).build
}
/**
* Helper to materialize (lookup) an [[akka.actor.ActorRef]]
* from Akka's protobuf representation in the supplied
* [[akka.actor.ActorSystem].
*/
def deserializeActorRef(system: ActorSystem, refProtocol: ActorRefProtocol): ActorRef =
system.actorFor(refProtocol.getPath)
}
/**
* This Serializer serializes `com.google.protobuf.Message`s

View file

@ -0,0 +1,113 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import com.typesafe.config.ConfigFactory
import akka.testkit.AkkaSpec
import akka.actor.Actor
import akka.actor.Address
import akka.actor.Props
import akka.actor.Deploy
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy
import akka.remote.DaemonMsgCreate
import akka.remote.RemoteScope
import akka.routing.RoundRobinRouter
import akka.routing.FromConfig
import akka.util.duration._
import akka.actor.FromClassCreator
object DaemonMsgCreateSerializerSpec {
class MyActor extends Actor {
def receive = {
case _
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DaemonMsgCreateSerializerSpec extends AkkaSpec {
import DaemonMsgCreateSerializerSpec._
val ser = SerializationExtension(system)
val supervisor = system.actorOf(Props[MyActor], "supervisor")
"Serialization" must {
"resolve DaemonMsgCreateSerializer" in {
ser.serializerFor(classOf[DaemonMsgCreate]).getClass must be(classOf[DaemonMsgCreateSerializer])
}
"serialize and de-serialize DaemonMsgCreate with FromClassCreator" in {
verifySerialization {
DaemonMsgCreate(
props = Props[MyActor],
deploy = Deploy(),
path = "foo",
supervisor = supervisor)
}
}
"serialize and de-serialize DaemonMsgCreate with function creator" in {
verifySerialization {
DaemonMsgCreate(
props = Props().withCreator(new MyActor),
deploy = Deploy(),
path = "foo",
supervisor = supervisor)
}
}
"serialize and de-serialize DaemonMsgCreate with Deploy and RouterConfig" in {
verifySerialization {
// Duration.Inf doesn't equal Duration.Inf, so we use another for test
val supervisorStrategy = OneForOneStrategy(3, 10 seconds) {
case _ SupervisorStrategy.Escalate
}
val deploy1 = Deploy(
path = "path1",
config = ConfigFactory.parseString("a=1"),
routerConfig = RoundRobinRouter(nrOfInstances = 5, supervisorStrategy = supervisorStrategy),
scope = RemoteScope(Address("akka", "Test", "host1", 1921)))
val deploy2 = Deploy(
path = "path2",
config = ConfigFactory.parseString("a=2"),
routerConfig = FromConfig,
scope = RemoteScope(Address("akka", "Test", "host2", 1922)))
DaemonMsgCreate(
props = Props[MyActor].withDispatcher("my-disp").withDeploy(deploy1),
deploy = deploy2,
path = "foo",
supervisor = supervisor)
}
}
def verifySerialization(msg: DaemonMsgCreate): Unit = {
val bytes = ser.serialize(msg) match {
case Left(exception) fail(exception)
case Right(bytes) bytes
}
ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgCreate]) match {
case Left(exception) fail(exception)
case Right(m: DaemonMsgCreate) assertDaemonMsgCreate(msg, m)
}
}
def assertDaemonMsgCreate(expected: DaemonMsgCreate, got: DaemonMsgCreate): Unit = {
// can't compare props.creator when function
if (expected.props.creator.isInstanceOf[FromClassCreator])
assert(got.props.creator === expected.props.creator)
assert(got.props.dispatcher === expected.props.dispatcher)
assert(got.props.dispatcher === expected.props.dispatcher)
assert(got.props.routerConfig === expected.props.routerConfig)
assert(got.props.deploy === expected.props.deploy)
assert(got.deploy === expected.deploy)
assert(got.path === expected.path)
assert(got.supervisor === expected.supervisor)
}
}
}

View file

@ -0,0 +1,49 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import akka.testkit.AkkaSpec
import akka.remote.DaemonMsgWatch
import akka.actor.Actor
import akka.actor.Props
object DaemonMsgWatchSerializerSpec {
class MyActor extends Actor {
def receive = {
case _
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DaemonMsgWatchSerializerSpec extends AkkaSpec {
import DaemonMsgWatchSerializerSpec._
val ser = SerializationExtension(system)
"Serialization" must {
"resolve DaemonMsgWatchSerializer" in {
ser.serializerFor(classOf[DaemonMsgWatch]).getClass must be(classOf[DaemonMsgWatchSerializer])
}
"serialize and de-serialize DaemonMsgWatch" in {
val watcher = system.actorOf(Props[MyActor], "watcher")
val watched = system.actorOf(Props[MyActor], "watched")
val msg = DaemonMsgWatch(watcher, watched)
val bytes = ser.serialize(msg) match {
case Left(exception) fail(exception)
case Right(bytes) bytes
}
ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgWatch]) match {
case Left(exception) fail(exception)
case Right(m) assert(m === msg)
}
}
}
}

View file

@ -5,7 +5,7 @@ package akka.testkit
import org.scalatest.{ WordSpec, BeforeAndAfterAll, Tag }
import org.scalatest.matchers.MustMatchers
import akka.actor.{ ActorSystem, ActorSystemImpl }
import akka.actor.ActorSystem
import akka.actor.{ Actor, ActorRef, Props }
import akka.event.{ Logging, LoggingAdapter }
import akka.util.duration._
@ -72,7 +72,7 @@ abstract class AkkaSpec(_system: ActorSystem)
final override def afterAll {
system.shutdown()
try Await.ready(system.asInstanceOf[ActorSystemImpl].terminationFuture, 5 seconds) catch {
try system.awaitTermination(5 seconds) catch {
case _: TimeoutException system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
}
atTermination()

View file

@ -154,7 +154,9 @@ object AkkaBuild extends Build {
base = file("akka-durable-mailboxes/akka-mailboxes-common"),
dependencies = Seq(remote, testkit % "compile;test->test"),
settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.mailboxes
libraryDependencies ++= Dependencies.mailboxes,
// DurableMailboxSpec published in akka-mailboxes-common-test
publishArtifact in Test := true
)
)
@ -257,7 +259,8 @@ object AkkaBuild extends Build {
lazy val docs = Project(
id = "akka-docs",
base = file("akka-docs"),
dependencies = Seq(actor, testkit % "test->test", remote, cluster, slf4j, agent, transactor, fileMailbox, zeroMQ, camel),
dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test",
remote, cluster, slf4j, agent, transactor, fileMailbox, zeroMQ, camel),
settings = defaultSettings ++ Sphinx.settings ++ Seq(
unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get },
libraryDependencies ++= Dependencies.docs,
@ -380,7 +383,7 @@ object Dependencies {
val fileMailbox = Seq(Test.commonsIo, Test.scalatest, Test.junit)
val kernel = Seq(jmxClient, Test.scalatest, Test.junit)
val kernel = Seq(Test.scalatest, Test.junit)
val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito)
@ -408,7 +411,6 @@ object Dependency {
// Compile
val camelCore = "org.apache.camel" % "camel-core" % V.Camel // ApacheV2
val jmxClient = "cmdline-jmxclient" % "cmdline-jmxclient" % "0.10.3" // LGPL
val netty = "io.netty" % "netty" % V.Netty // ApacheV2
val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD
val scalaStm = "org.scala-tools" % "scala-stm_2.9.1" % V.ScalaStm // Modified BSD (Scala)