diff --git a/.gitignore b/.gitignore
index c4a1e0a3be..ef2fb469bb 100755
--- a/.gitignore
+++ b/.gitignore
@@ -1,7 +1,8 @@
*~
*#
src_managed
-project/plugins/project/
+activemq-data
+project/plugins/project
project/boot/*
*/project/build/target
*/project/boot
diff --git a/akka-fun-test-java/pom.xml b/akka-active-object-test/pom.xml
similarity index 54%
rename from akka-fun-test-java/pom.xml
rename to akka-active-object-test/pom.xml
index c8ddef3320..62935a30b4 100644
--- a/akka-fun-test-java/pom.xml
+++ b/akka-active-object-test/pom.xml
@@ -3,24 +3,28 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4.0.0
- Akka Functional Tests in Java
- akka-fun-test-java
+ Akka Active Object Tests in Java
+ akka-active-object-testse.scalablesolutions.akka0.9jar
- 2.8.0.RC2
- 0.5.2
- 1.1.5
- 1.9.18-i
+ 2.8.0.RC3embedded-repoEmbedded Repository
- file://Users/jboner/src/scala/akka/embedded-repo
+ file:///Users/jboner/src/scala/akka/embedded-repo
+
+
+
+ jboss
+ JBoss Repository
+ https://repository.jboss.org/nexus/content/groups/public
+
@@ -35,59 +39,9 @@
se.scalablesolutions.akka
- akka-kernel_2.8.0.RC2
+ akka-core_2.8.0.RC30.9
-
- se.scalablesolutions.akka
- akka-persistence-cassandra_2.8.0.RC2
- 0.9
-
-
- com.google.protobuf
- protobuf-java
- 2.2.0
-
-
- org.codehaus.jackson
- jackson-core-asl
- 1.2.1
-
-
- org.codehaus.jackson
- jackson-mapper-asl
- 1.2.1
-
-
- com.sun.grizzly
- grizzly-servlet-webserver
- ${grizzly.version}
- test
-
-
- com.sun.jersey
- jersey-server
- ${jersey.version}
- test
-
-
- com.sun.jersey
- jersey-json
- ${jersey.version}
- test
-
-
- com.sun.jersey
- jersey-client
- ${jersey.version}
- test
-
-
- com.sun.jersey
- jersey-atom
- ${jersey.version}
- test
- junitjunit
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
similarity index 100%
rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTest.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/AllTest.java
similarity index 67%
rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTest.java
rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/AllTest.java
index 2e724f0324..465c9da182 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTest.java
+++ b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/AllTest.java
@@ -8,13 +8,9 @@ public class AllTest extends TestCase {
public static Test suite() {
TestSuite suite = new TestSuite("All Java tests");
suite.addTestSuite(InMemoryStateTest.class);
- //suite.addTestSuite(InMemNestedStateTest.class);
+ suite.addTestSuite(InMemNestedStateTest.class);
suite.addTestSuite(RemoteInMemoryStateTest.class);
suite.addTestSuite(ActiveObjectGuiceConfiguratorTest.class);
- //suite.addTestSuite(PersistentStateTest.class);
- //suite.addTestSuite(PersistentNestedStateTest.class);
- //suite.addTestSuite(RemotePersistentStateTest.class);
- //suite.addTestSuite(RestTest.class);
return suite;
}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Bar.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/Bar.java
similarity index 100%
rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Bar.java
rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/Bar.java
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/BarImpl.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/BarImpl.java
similarity index 100%
rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/BarImpl.java
rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/BarImpl.java
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Ext.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/Ext.java
similarity index 100%
rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Ext.java
rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/Ext.java
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ExtImpl.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/ExtImpl.java
similarity index 100%
rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ExtImpl.java
rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/ExtImpl.java
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Foo.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/Foo.java
similarity index 100%
rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Foo.java
rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/Foo.java
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java
similarity index 100%
rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java
rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
similarity index 99%
rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
index 992c188fa1..746df950bf 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
+++ b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
@@ -9,7 +9,6 @@ import se.scalablesolutions.akka.config.Config;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
-import se.scalablesolutions.akka.kernel.Kernel;
import junit.framework.TestCase;
public class InMemNestedStateTest extends TestCase {
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
similarity index 100%
rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java
similarity index 100%
rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java
rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
similarity index 99%
rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
index 740bfd892c..3708d58acc 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
+++ b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
@@ -13,7 +13,6 @@ import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
-import se.scalablesolutions.akka.kernel.Kernel;
public class InMemoryStateTest extends TestCase {
static String messageLog = "";
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
similarity index 100%
rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala
index 9ec943cfc6..0780e355dd 100644
--- a/akka-core/src/main/scala/actor/ActiveObject.scala
+++ b/akka-core/src/main/scala/actor/ActiveObject.scala
@@ -66,23 +66,23 @@ final class ActiveObjectConfiguration {
}
/**
- * Holds RTTI (runtime type information) for the Active Object, f.e. current 'sender'
+ * Holds RTTI (runtime type information) for the Active Object, f.e. current 'sender'
* reference, the 'senderFuture' reference etc.
*
- * In order to make use of this context you have to create a member field in your
- * Active Object that has the type 'ActiveObjectContext', then an instance will
- * be injected for you to use.
+ * In order to make use of this context you have to create a member field in your
+ * Active Object that has the type 'ActiveObjectContext', then an instance will
+ * be injected for you to use.
*
- * This class does not contain static information but is updated by the runtime system
- * at runtime.
+ * This class does not contain static information but is updated by the runtime system
+ * at runtime.
*
- * Here is an example of usage:
+ * Here is an example of usage:
*
* class Ping {
- * // This context will be injected, holds RTTI (runtime type information)
- * // for the current message send
+ * // This context will be injected, holds RTTI (runtime type information)
+ * // for the current message send
* private ActiveObjectContext context = null;
- *
+ *
* public void hit(int count) {
* Pong pong = (Pong) context.getSender();
* pong.hit(count++)
@@ -100,19 +100,19 @@ final class ActiveObjectContext {
* Returns the current sender Active Object reference.
* Scala style getter.
*/
- def sender: AnyRef = {
+ def sender: AnyRef = {
if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.")
else _sender
- }
+ }
/**
* Returns the current sender Active Object reference.
* Java style getter.
*/
- def getSender: AnyRef = {
+ def getSender: AnyRef = {
if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.")
else _sender
- }
+ }
/**
* Returns the current sender future Active Object reference.
@@ -364,7 +364,7 @@ object ActiveObject extends Logging {
proxy.asInstanceOf[T]
}
- private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef,
+ private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef,
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val context = injectActiveObjectContext(target)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, false)
@@ -462,7 +462,7 @@ object ActiveObject extends Logging {
if (parent != null) injectActiveObjectContext0(activeObject, parent)
else {
log.warning(
- "Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.",
+ "Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.",
activeObject.getClass.getName)
None
}
@@ -522,7 +522,7 @@ private[akka] sealed class ActiveObjectAspect {
remoteAddress = init.remoteAddress
timeout = init.timeout
isInitialized = true
-
+
}
dispatch(joinPoint)
}
@@ -583,7 +583,7 @@ private[akka] sealed class ActiveObjectAspect {
} else future.result
private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE
-
+
private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = {
var isEscaped = false
val escapedArgs = for (arg <- args) yield {
@@ -606,11 +606,11 @@ private[akka] sealed class ActiveObjectAspect {
joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean, sender: AnyRef, senderFuture: CompletableFuture[Any]) {
override def toString: String = synchronized {
- "Invocation [joinPoint: " + joinPoint.toString +
- ", isOneWay: " + isOneWay +
+ "Invocation [joinPoint: " + joinPoint.toString +
+ ", isOneWay: " + isOneWay +
", isVoid: " + isVoid +
- ", sender: " + sender +
- ", senderFuture: " + senderFuture +
+ ", sender: " + sender +
+ ", senderFuture: " + senderFuture +
"]"
}
@@ -653,11 +653,11 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
private var postRestart: Option[Method] = None
private var initTxState: Option[Method] = None
private var context: Option[ActiveObjectContext] = None
-
+
def this(transactionalRequired: Boolean) = this(transactionalRequired,None)
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef, ctx: Option[ActiveObjectContext]) = {
- if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired))
+ if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired))
self.makeTransactionRequired
self.id = targetClass.getName
target = Some(targetInstance)
@@ -705,7 +705,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
def receive = {
case Invocation(joinPoint, isOneWay, _, sender, senderFuture) =>
- context.foreach { ctx =>
+ context.foreach { ctx =>
if (sender ne null) ctx._sender = sender
if (senderFuture ne null) ctx._senderFuture = senderFuture
}
diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala
index 2a9e70bde9..3b7d62f97f 100644
--- a/akka-core/src/main/scala/actor/Actor.scala
+++ b/akka-core/src/main/scala/actor/Actor.scala
@@ -295,8 +295,10 @@ trait Actor extends Logging {
type Receive = Actor.Receive
/*
- * For internal use only, functions as the implicit sender references when invoking
- * one of the message send functions (!, !! and !!!).
+ * Option[ActorRef] representation of the 'self' ActorRef reference.
+ *
+ * Mainly for internal use, functions as the implicit sender references when invoking
+ * one of the message send functions ('!', '!!' and '!!!').
*/
implicit val optionSelf: Option[ActorRef] = {
val ref = Actor.actorRefInCreation.value
@@ -313,8 +315,10 @@ trait Actor extends Logging {
}
/*
- * For internal use only, functions as the implicit sender references when invoking
- * the forward function.
+ * Some[ActorRef] representation of the 'self' ActorRef reference.
+ *
+ * Mainly for internal use, functions as the implicit sender references when invoking
+ * the 'forward' function.
*/
implicit val someSelf: Some[ActorRef] = optionSelf.asInstanceOf[Some[ActorRef]]
@@ -325,9 +329,31 @@ trait Actor extends Logging {
*
* self ! message
*
+ * Here you also find most of the Actor API.
+ *
+ * For example fields like:
+ *
*/
- val self: ActorRef = optionSelf.get
- self.id = getClass.getName
+ val self: ActorRef = {
+ val zelf = optionSelf.get
+ zelf.id = getClass.getName
+ zelf
+ }
/**
* User overridable callback/setting.
@@ -339,64 +365,64 @@ trait Actor extends Logging {
*
* def receive = {
* case Ping =>
- * println("got a ping")
+ * log.info("got a 'Ping' message")
* self.reply("pong")
*
* case OneWay =>
- * println("got a oneway")
+ * log.info("got a 'OneWay' message")
*
- * case _ =>
- * println("unknown message, ignoring")
+ * case unknown =>
+ * log.warning("unknown message [%s], ignoring", unknown)
* }
*
*/
protected def receive: Receive
/**
- * User overridable callback/setting.
+ * User overridable callback.
*
- * Optional callback method that is called during initialization.
- * To be implemented by subclassing actor.
+ * Is called when an Actor is started by invoking 'actor.start'.
*/
def init {}
/**
- * User overridable callback/setting.
+ * User overridable callback.
*
- * Mandatory callback method that is called during restart and reinitialization after a server crash.
- * To be implemented by subclassing actor.
+ * Is called when 'actor.stop' is invoked.
+ */
+ def shutdown {}
+
+ /**
+ * User overridable callback.
+ *
+ * Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
*/
def preRestart(reason: Throwable) {}
/**
- * User overridable callback/setting.
+ * User overridable callback.
*
- * Mandatory callback method that is called during restart and reinitialization after a server crash.
- * To be implemented by subclassing actor.
+ * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
*/
def postRestart(reason: Throwable) {}
/**
- * User overridable callback/setting.
+ * User overridable callback.
*
- * Optional callback method that is called during termination.
- * To be implemented by subclassing actor.
+ * Is called during initialization. Can be used to initialize transactional state. Will be invoked within a transaction.
*/
def initTransactionalState {}
- /**
- * User overridable callback/setting.
- *
- * Optional callback method that is called during termination.
- * To be implemented by subclassing actor.
- */
- def shutdown {}
-
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
- private[akka] def base: Receive = lifeCycles orElse (self.hotswap getOrElse receive)
+ private[akka] def base: Receive = try {
+ lifeCycles orElse (self.hotswap getOrElse receive)
+ } catch {
+ case e: NullPointerException => throw new IllegalStateException(
+ "The 'self' ActorRef reference for [" + getClass.getName + "] is NULL, error in the ActorRef initialization process.")
+ }
private val lifeCycles: Receive = {
case HotSwap(code) => self.hotswap = code
@@ -414,39 +440,3 @@ trait Actor extends Logging {
override def toString = self.toString
}
-
-/**
- * Base class for the different dispatcher types.
- *
- * @author Jonas Bonér
- */
-sealed abstract class DispatcherType
-
-/**
- * Module that holds the different dispatcher types.
- *
- * @author Jonas Bonér
- */
-object DispatcherType {
- case object EventBasedThreadPooledProxyInvokingDispatcher extends DispatcherType
- case object EventBasedSingleThreadDispatcher extends DispatcherType
- case object EventBasedThreadPoolDispatcher extends DispatcherType
- case object ThreadBasedDispatcher extends DispatcherType
-}
-
-/**
- * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
- * http://en.wikipedia.org/wiki/Actor_model
- *
- * An actor has a well-defined (non-cyclic) life-cycle.
- *
- * => NEW (newly created actor) - can't receive messages (yet)
- * => STARTED (when 'start' is invoked) - can receive messages
- * => SHUT DOWN (when 'exit' is invoked) - can't do anything
- *
- *
- * @author Jonas Bonér
- */
-class ActorMessageInvoker private[akka] (val actorRef: ActorRef) extends MessageInvoker {
- def invoke(handle: MessageInvocation) = actorRef.invoke(handle)
-}
diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala
index 2491863a11..3cece45a18 100644
--- a/akka-core/src/main/scala/actor/ActorRef.scala
+++ b/akka-core/src/main/scala/actor/ActorRef.scala
@@ -222,7 +222,7 @@ trait ActorRef extends TransactionManagement {
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
*/
def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture }
-
+
/**
* Is the actor being restarted?
*/
@@ -300,22 +300,6 @@ trait ActorRef extends TransactionManagement {
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
- /**
- * Sends a message asynchronously and waits on a future for a reply message.
- * Uses the time-out defined in the Actor.
- *
- * It waits on the reply either until it receives it (in the form of Some(replyMessage))
- * or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
- *
- * NOTE:
- * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
- * implement request/response message exchanges.
- *
- * If you are sending messages using !! then you have to use self.reply(..)
- * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
- */
-// def !(implicit sender: Option[ActorRef] = None): Option[T] = !
-
/**
* Sends a message asynchronously returns a future holding the eventual reply message.
*
@@ -349,14 +333,15 @@ trait ActorRef extends TransactionManagement {
* Use self.reply(..) to reply with a message to the original sender of the message currently
* being processed.
*
- * Throws an IllegalStateException if unable to determine what to reply to
+ * Throws an IllegalStateException if unable to determine what to reply to.
*/
def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably: " +
"\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
- "\n\t\t2. Invoked a method on an Active Object from an instance NOT an Active Object.")
-
+ "\n\t\t2. Invoked a method on an Active Object from an instance NOT an Active Object." +
+ "\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope")
+
/**
* Use reply_?(..) to reply with a message to the original sender of the message currently
* being processed.
@@ -595,7 +580,7 @@ sealed class LocalActorRef private[akka](
@volatile private[akka] var _supervisor: Option[ActorRef] = None
protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
- protected[this] val actorInstance = new AtomicReference[Actor](newActor)
+ protected[this] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
@volatile private var isInInitialization = false
@volatile private var runActorInitialization = false
@@ -1224,7 +1209,7 @@ private[akka] case class RemoteActorRef private[akka] (
extends ActorRef {
_uuid = uuuid
timeout = _timeout
-
+
start
lazy val remoteClient = RemoteClient.clientFor(hostname, port, loader)
diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala
index ea77b2c6b7..ee1b6e2b4f 100644
--- a/akka-core/src/main/scala/actor/ActorRegistry.scala
+++ b/akka-core/src/main/scala/actor/ActorRegistry.scala
@@ -65,6 +65,12 @@ object ActorRegistry extends Logging {
all.toList
}
+ /**
+ * Finds any actor that matches T.
+ */
+ def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] =
+ actorsFor[T](manifest).headOption
+
/**
* Finds all actors of the exact type specified by the class passed in as the Class argument.
*/
diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala
index e65a5a5b74..b800c94f23 100644
--- a/akka-core/src/main/scala/actor/Agent.scala
+++ b/akka-core/src/main/scala/actor/Agent.scala
@@ -88,7 +88,8 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
*
*
*
-* IMPORTANT:
+* IMPORTANT:
+*
* You can *not* call 'agent.get', 'agent()' or use the monadic 'foreach',
* 'map' and 'flatMap' within an enclosing transaction since that would block
* the transaction indefinitely. But all other operations are fine. The system
@@ -103,7 +104,6 @@ sealed class Agent[T] private (initialValue: T) {
import Actor._
private val dispatcher = actorOf(new AgentDispatcher[T](initialValue)).start
- dispatcher ! Value(initialValue)
/**
* Submits a request to read the internal state.
@@ -215,7 +215,7 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto
import Actor._
log.debug("Starting up Agent [%s]", self.uuid)
- private lazy val value = Ref[T]()
+ private val value = Ref[T](initialValue)
/**
* Periodically handles incoming messages.
@@ -233,6 +233,5 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto
* Performs a CAS operation, atomically swapping the internal state with the value
* provided as a by-name parameter.
*/
- private final def swap(newData: => T): Unit = value.swap(newData)
+ private def swap(newData: => T): Unit = value.swap(newData)
}
-
diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
index a44db8fd07..47057b24a7 100644
--- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
+++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
@@ -7,10 +7,43 @@ package se.scalablesolutions.akka.actor
import java.io.File
import java.net.{URL, URLClassLoader}
import java.util.jar.JarFile
+import java.util.Enumeration
import se.scalablesolutions.akka.util.{Bootable, Logging}
import se.scalablesolutions.akka.config.Config._
+class AkkaDeployClassLoader(urls : List[URL], parent : ClassLoader) extends URLClassLoader(urls.toArray.asInstanceOf[Array[URL]],parent)
+{
+ override def findResources(resource : String) = {
+ val normalResult = super.findResources(resource)
+ if(normalResult.hasMoreElements) normalResult else findDeployed(resource)
+ }
+
+ def findDeployed(resource : String) = new Enumeration[URL]{
+ private val it = getURLs.flatMap( listClassesInPackage(_,resource) ).iterator
+ def hasMoreElements = it.hasNext
+ def nextElement = it.next
+ }
+
+ def listClassesInPackage(jar : URL, pkg : String) = {
+ val f = new File(jar.getFile)
+ val jf = new JarFile(f)
+ try {
+ val es = jf.entries
+ var result = List[URL]()
+ while(es.hasMoreElements)
+ {
+ val e = es.nextElement
+ if(!e.isDirectory && e.getName.startsWith(pkg) && e.getName.endsWith(".class"))
+ result ::= new URL("jar:" + f.toURI.toURL + "!/" + e)
+ }
+ result
+ } finally {
+ jf.close
+ }
+ }
+}
+
/**
* Handles all modules in the deploy directory (load and unload)
*/
@@ -46,10 +79,7 @@ trait BootableActorLoaderService extends Bootable with Logging {
log.debug("Loading dependencies [%s]", dependencyJars)
val allJars = toDeploy ::: dependencyJars
- URLClassLoader.newInstance(
- allJars.toArray.asInstanceOf[Array[URL]],
- Thread.currentThread.getContextClassLoader)
- //parentClassLoader)
+ new AkkaDeployClassLoader(allJars,Thread.currentThread.getContextClassLoader)
} else Thread.currentThread.getContextClassLoader)
}
diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala
index 3028a87c60..c13a345efc 100644
--- a/akka-core/src/main/scala/actor/Supervisor.scala
+++ b/akka-core/src/main/scala/actor/Supervisor.scala
@@ -5,7 +5,7 @@
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.config.ScalaConfig._
-import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy, ConfiguratorRepository, Configurator}
+import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.remote.RemoteServer
import Actor._
@@ -120,18 +120,17 @@ class SupervisorFactory private[akka] (val config: SupervisorConfig) extends Log
* @author Jonas Bonér
*/
sealed class Supervisor private[akka] (
- handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]])
- extends Configurator {
+ handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]) {
import Supervisor._
- private val childActors = new ConcurrentHashMap[String, List[ActorRef]]
- private val childSupervisors = new CopyOnWriteArrayList[Supervisor]
- private[akka] val supervisor = SupervisorActor(handler, trapExceptions)
+ private val _childActors = new ConcurrentHashMap[String, List[ActorRef]]
+ private val _childSupervisors = new CopyOnWriteArrayList[Supervisor]
+
+ private[akka] val supervisor = actorOf(new SupervisorActor(handler, trapExceptions)).start
def uuid = supervisor.uuid
def start: Supervisor = {
- ConfiguratorRepository.registerConfigurator(this)
this
}
@@ -141,15 +140,11 @@ sealed class Supervisor private[akka] (
def unlink(child: ActorRef) = supervisor.unlink(child)
- // FIXME recursive search + do not fix if we remove feature that Actors can be RESTful usin Jersey annotations
- def getInstance[T](clazz: Class[T]): List[T] = childActors.get(clazz.getName).asInstanceOf[List[T]]
+ def children: List[ActorRef] =
+ _childActors.values.toArray.toList.asInstanceOf[List[List[ActorRef]]].flatten
- // FIXME recursive search + do not fix if we remove feature that Actors can be RESTful usin Jersey annotations
- def getComponentInterfaces: List[Class[_]] =
- childActors.values.toArray.toList.asInstanceOf[List[List[AnyRef]]].flatten.map(_.getClass)
-
- // FIXME recursive search + do not fix if we remove feature that Actors can be RESTful usin Jersey annotations
- def isDefined(clazz: Class[_]): Boolean = childActors.containsKey(clazz.getName)
+ def childSupervisors: List[Supervisor] =
+ _childActors.values.toArray.toList.asInstanceOf[List[Supervisor]]
def configure(config: SupervisorConfig): Unit = config match {
case SupervisorConfig(_, servers) =>
@@ -159,11 +154,11 @@ sealed class Supervisor private[akka] (
actorRef.start
val className = actorRef.actor.getClass.getName
val currentActors = {
- val list = childActors.get(className)
+ val list = _childActors.get(className)
if (list eq null) List[ActorRef]()
else list
}
- childActors.put(className, actorRef :: currentActors)
+ _childActors.put(className, actorRef :: currentActors)
actorRef.lifeCycle = Some(lifeCycle)
supervisor.link(actorRef)
remoteAddress.foreach(address =>
@@ -171,7 +166,7 @@ sealed class Supervisor private[akka] (
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val childSupervisor = Supervisor(supervisorConfig)
supervisor.link(childSupervisor.supervisor)
- childSupervisors.add(childSupervisor)
+ _childSupervisors.add(childSupervisor)
})
}
}
diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
index 9785ea2082..8d3a089d26 100644
--- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
+++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
@@ -40,7 +40,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
* @param clazz the class for the active object
* @return the active objects for the class
*/
- override def getInstance[T](clazz: Class[T]): List[T] = synchronized {
+ def getInstance[T](clazz: Class[T]): List[T] = synchronized {
log.debug("Retrieving active object [%s]", clazz.getName)
if (injector eq null) throw new IllegalStateException(
"inject() and/or supervise() must be called before invoking getInstance(clazz)")
@@ -52,7 +52,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
List(proxy.asInstanceOf[T])
}
- override def isDefined(clazz: Class[_]): Boolean = synchronized {
+ def isDefined(clazz: Class[_]): Boolean = synchronized {
activeObjectRegistry.get(clazz).isDefined
}
@@ -60,7 +60,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
injector.getInstance(clazz).asInstanceOf[T]
}
- override def getComponentInterfaces: List[Class[_]] =
+ def getComponentInterfaces: List[Class[_]] =
for (c <- components) yield {
if (c.intf.isDefined) c.intf.get
else c.target
@@ -122,7 +122,6 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
override def supervise: ActiveObjectConfiguratorBase = synchronized {
if (injector eq null) inject
supervisor = Some(ActiveObject.supervise(restartStrategy, supervised))
- ConfiguratorRepository.registerConfigurator(this)
this
}
diff --git a/akka-core/src/main/scala/config/Configurator.scala b/akka-core/src/main/scala/config/Configurator.scala
index 58e37ce931..db92c5f35b 100644
--- a/akka-core/src/main/scala/config/Configurator.scala
+++ b/akka-core/src/main/scala/config/Configurator.scala
@@ -6,24 +6,7 @@ package se.scalablesolutions.akka.config
import ScalaConfig.{RestartStrategy, Component}
-/**
- * Manages the active abject or actor that has been put under supervision for the class specified.
- */
-private[akka] trait Configurator {
- /**
- * Returns the active abject or actor that has been put under supervision for the class specified.
- *
- * @param clazz the class for the active object
- * @return the active object for the class
- */
- def getInstance[T](clazz: Class[T]): List[T]
-
- def getComponentInterfaces: List[Class[_]]
-
- def isDefined(clazz: Class[_]): Boolean
-}
-
-private[akka] trait ActiveObjectConfiguratorBase extends Configurator {
+private[akka] trait ActiveObjectConfiguratorBase {
def getExternalDependency[T](clazz: Class[T]): T
def configure(restartStrategy: RestartStrategy, components: List[Component]): ActiveObjectConfiguratorBase
diff --git a/akka-core/src/main/scala/config/ConfiguratorRepository.scala b/akka-core/src/main/scala/config/ConfiguratorRepository.scala
deleted file mode 100644
index 8ddc16f9fe..0000000000
--- a/akka-core/src/main/scala/config/ConfiguratorRepository.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-
-package se.scalablesolutions.akka.config
-
-import scala.collection.mutable.HashSet
-
-import se.scalablesolutions.akka.util.Logging
-
-object ConfiguratorRepository extends Logging {
-
- private val configuration = new HashSet[Configurator]
-
- def registerConfigurator(conf: Configurator) = synchronized {
- configuration += conf
- }
-
- def getConfigurators: List[Configurator] = synchronized {
- configuration.toList
- //configurations.getOrElse(ctx, throw new IllegalArgumentException("No configuration for servlet context [" + ctx + "]"))
- }
-}
-
-class ConfiguratorRepository extends Logging {
- def registerConfigurator(conf: Configurator) = ConfiguratorRepository.registerConfigurator(conf)
- def getConfigurators: List[Configurator] = ConfiguratorRepository.getConfigurators
-}
-
diff --git a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
index 72e94526ce..832ae9203a 100644
--- a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
@@ -7,19 +7,19 @@ package se.scalablesolutions.akka.dispatch
import java.util.{LinkedList, Queue, List}
import java.util.HashMap
-import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor, ActorRef}
+import se.scalablesolutions.akka.actor.{Actor, ActorRef}
abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher {
@volatile protected var active: Boolean = false
protected val queue = new ReactiveMessageQueue(name)
- protected val messageInvokers = new HashMap[AnyRef, MessageInvoker]
+ protected val messageInvokers = new HashMap[ActorRef, ActorRef]
protected var selectorThread: Thread = _
protected val guard = new Object
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
override def register(actorRef: ActorRef) = synchronized {
- messageInvokers.put(actorRef, new ActorMessageInvoker(actorRef))
+ messageInvokers.put(actorRef, actorRef)
super.register(actorRef)
}
diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala
index 95e77625ec..e938e36e4e 100644
--- a/akka-core/src/main/scala/dispatch/Dispatchers.scala
+++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala
@@ -40,7 +40,7 @@ import se.scalablesolutions.akka.config.Config.config
* @author Jonas Bonér
*/
object Dispatchers {
- val THROUGHPUT = config.getInt("akka.dispatcher.throughput", 5)
+ val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
override def register(actor: ActorRef) = {
diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
index 185c28ff5b..ccbb9edc94 100644
--- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
@@ -54,6 +54,10 @@ import se.scalablesolutions.akka.actor.ActorRef
* the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object.
*
* @author Jonas Bonér
+ * @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
+ * mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher
+ * always continues until the mailbox is empty.
+ * Larger values (or zero or negative) increase througput, smaller values increase fairness
*/
class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispatchers.THROUGHPUT) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String) = this(_name, Dispatchers.THROUGHPUT) // Needed for Java API usage
@@ -70,30 +74,20 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
def run = {
var lockAcquiredOnce = false
var finishedBeforeMailboxEmpty = false
- // this do-while loop is required to prevent missing new messages between the end of the inner while
- // loop and releasing the lock
val lock = receiver.dispatcherLock
val mailbox = receiver.mailbox
+ // this do-while loop is required to prevent missing new messages between the end of the inner while
+ // loop and releasing the lock
do {
if (lock.tryLock) {
+ // Only dispatch if we got the lock. Otherwise another thread is already dispatching.
lockAcquiredOnce = true
try {
- // Only dispatch if we got the lock. Otherwise another thread is already dispatching.
- var i = 0
- var messageInvocation = mailbox.poll
- while (messageInvocation != null) {
- messageInvocation.invoke
- i += 1
- if (i < throughput)
- messageInvocation = mailbox.poll
- else {
- finishedBeforeMailboxEmpty = !mailbox.isEmpty
- messageInvocation = null
- }
- }
+ finishedBeforeMailboxEmpty = processMailbox(receiver)
} finally {
lock.unlock
- if (finishedBeforeMailboxEmpty) dispatch(receiver)
+ if (finishedBeforeMailboxEmpty)
+ dispatch(receiver)
}
}
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty))
@@ -101,6 +95,30 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
})
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")
+
+ /**
+ * Process the messages in the mailbox of the given actor.
+ *
+ * @return true if the processing finished before the mailbox was empty, due to the throughput constraint
+ */
+ def processMailbox(receiver: ActorRef): Boolean = {
+ var processedMessages = 0
+ var messageInvocation = receiver.mailbox.poll
+ while (messageInvocation != null) {
+ messageInvocation.invoke
+ processedMessages += 1
+ // check if we simply continue with other messages, or reached the throughput limit
+ if (throughput <= 0 || processedMessages < throughput)
+ messageInvocation = receiver.mailbox.poll
+ else {
+ return !receiver.mailbox.isEmpty
+ messageInvocation = null
+ }
+ }
+
+ return false
+ }
+
def start = if (!active) {
log.debug("Starting ExecutorBasedEventDrivenDispatcher [%s]", name)
log.debug("Throughput for %s = %d", name, throughput)
diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala
index 8e094fe108..fbf52224ce 100644
--- a/akka-core/src/main/scala/dispatch/MessageHandling.scala
+++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala
@@ -13,6 +13,9 @@ import java.util.concurrent.ConcurrentHashMap
import org.multiverse.commitbarriers.CountDownCommitBarrier
+/**
+ * @author Jonas Bonér
+ */
final class MessageInvocation(val receiver: ActorRef,
val message: Any,
val sender: Option[ActorRef],
@@ -49,14 +52,16 @@ final class MessageInvocation(val receiver: ActorRef,
}
}
+/**
+ * @author Jonas Bonér
+ */
trait MessageQueue {
def append(handle: MessageInvocation)
}
-trait MessageInvoker {
- def invoke(message: MessageInvocation)
-}
-
+/**
+ * @author Jonas Bonér
+ */
trait MessageDispatcher extends Logging {
protected val references = new ConcurrentHashMap[String, ActorRef]
def dispatch(invocation: MessageInvocation)
@@ -65,14 +70,16 @@ trait MessageDispatcher extends Logging {
def register(actorRef: ActorRef) = references.put(actorRef.uuid, actorRef)
def unregister(actorRef: ActorRef) = {
references.remove(actorRef.uuid)
- if (canBeShutDown)
- shutdown // shut down in the dispatcher's references is zero
+ if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero
}
def canBeShutDown: Boolean = references.isEmpty
def isShutdown: Boolean
def usesActorMailbox : Boolean
}
+/**
+ * @author Jonas Bonér
+ */
trait MessageDemultiplexer {
def select
def wakeUp
diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
index f5d8c034c1..5c1cb78a52 100644
--- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
@@ -7,17 +7,16 @@ package se.scalablesolutions.akka.dispatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.Queue
-import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorMessageInvoker}
+import se.scalablesolutions.akka.actor.{Actor, ActorRef}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
* @author Jonas Bonér
*/
-class ThreadBasedDispatcher(actor: ActorRef) extends MessageDispatcher {
+class ThreadBasedDispatcher(private val actor: ActorRef) extends MessageDispatcher {
private val name = actor.getClass.getName + ":" + actor.uuid
private val threadName = "thread-based:dispatcher:" + name
- private val messageHandler = new ActorMessageInvoker(actor)
private val queue = new BlockingMessageQueue(name)
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
@@ -30,7 +29,7 @@ class ThreadBasedDispatcher(actor: ActorRef) extends MessageDispatcher {
override def run = {
while (active) {
try {
- messageHandler.invoke(queue.take)
+ actor.invoke(queue.take)
} catch { case e: InterruptedException => active = false }
}
}
diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala
index 66f7b59baa..32380bec7d 100644
--- a/akka-core/src/main/scala/remote/Cluster.scala
+++ b/akka-core/src/main/scala/remote/Cluster.scala
@@ -261,11 +261,11 @@ object Cluster extends Cluster with Logging {
sup <- createSupervisor(actorRef)
} {
val serializer = Class.forName(config.getString(
- "akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
- .newInstance.asInstanceOf[Serializer]
+ "akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
+ .newInstance.asInstanceOf[Serializer]
- classLoader = serializerClassLoader orElse classLoader
- serializer.classLoader = classLoader
+ classLoader = serializerClassLoader orElse classLoader
+ serializer.classLoader = classLoader
actorRef.start
sup.start
actorRef ! InitClusterActor(serializer)
diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala
index f93c5dc345..cfb8a9a5ea 100644
--- a/akka-core/src/main/scala/remote/RemoteClient.scala
+++ b/akka-core/src/main/scala/remote/RemoteClient.scala
@@ -82,19 +82,19 @@ object RemoteClient extends Logging {
private[akka] def actorFor(uuid: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef =
RemoteActorRef(uuid, className, hostname, port, timeout, loader)
- def clientFor(hostname: String, port: Int): RemoteClient =
+ def clientFor(hostname: String, port: Int): RemoteClient =
clientFor(new InetSocketAddress(hostname, port), None)
- def clientFor(hostname: String, port: Int, loader: ClassLoader): RemoteClient =
+ def clientFor(hostname: String, port: Int, loader: ClassLoader): RemoteClient =
clientFor(new InetSocketAddress(hostname, port), Some(loader))
- def clientFor(address: InetSocketAddress): RemoteClient =
+ def clientFor(address: InetSocketAddress): RemoteClient =
clientFor(address, None)
- def clientFor(address: InetSocketAddress, loader: ClassLoader): RemoteClient =
+ def clientFor(address: InetSocketAddress, loader: ClassLoader): RemoteClient =
clientFor(address, Some(loader))
- private[akka] def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient =
+ private[akka] def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient =
clientFor(new InetSocketAddress(hostname, port), loader)
private[akka] def clientFor(address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized {
@@ -155,7 +155,7 @@ object RemoteClient extends Logging {
/**
* @author Jonas Bonér
*/
-class RemoteClient(val hostname: String, val port: Int, loader: Option[ClassLoader]) extends Logging {
+class RemoteClient private[akka] (val hostname: String, val port: Int, loader: Option[ClassLoader]) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port
@volatile private[remote] var isRunning = false
@@ -203,6 +203,10 @@ class RemoteClient(val hostname: String, val port: Int, loader: Option[ClassLoad
}
}
+ def registerListener(actorRef: ActorRef) = listeners.add(actorRef)
+
+ def deregisterListener(actorRef: ActorRef) = listeners.remove(actorRef)
+
def send[T](request: RemoteRequestProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) {
if (request.getIsOneWay) {
connection.getChannel.write(request)
@@ -222,17 +226,13 @@ class RemoteClient(val hostname: String, val port: Int, loader: Option[ClassLoad
throw exception
}
- def registerSupervisorForActor(actorRef: ActorRef) =
+ private[akka] def registerSupervisorForActor(actorRef: ActorRef) =
if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorRef + " since it is not under supervision")
else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef)
- def deregisterSupervisorForActor(actorRef: ActorRef) =
+ private[akka] def deregisterSupervisorForActor(actorRef: ActorRef) =
if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorRef + " since it is not under supervision")
else supervisors.remove(actorRef.supervisor.get.uuid)
-
- def registerListener(actorRef: ActorRef) = listeners.add(actorRef)
-
- def deregisterListener(actorRef: ActorRef) = listeners.remove(actorRef)
}
/**
@@ -330,7 +330,7 @@ class RemoteClientHandler(val name: String,
client.connection = bootstrap.connect(remoteAddress)
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
if (!client.connection.isSuccess) {
- client.listeners.toArray.foreach(l =>
+ client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
@@ -339,13 +339,13 @@ class RemoteClientHandler(val name: String,
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
- client.listeners.toArray.foreach(l =>
+ client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientConnected(client.hostname, client.port))
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
- client.listeners.toArray.foreach(l =>
+ client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port))
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
}
diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala
index 3b02a3a850..aafe38c910 100644
--- a/akka-core/src/main/scala/remote/RemoteServer.scala
+++ b/akka-core/src/main/scala/remote/RemoteServer.scala
@@ -184,7 +184,7 @@ class RemoteServer extends Logging {
def start(_hostname: String, _port: Int): RemoteServer =
start(_hostname, _port, None)
- private def start(_hostname: String, _port: Int, loader: ClassLoader): RemoteServer =
+ private def start(_hostname: String, _port: Int, loader: ClassLoader): RemoteServer =
start(_hostname, _port, Some(loader))
private def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized {
@@ -364,12 +364,12 @@ class RemoteServerHandler(
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
actorRef.start
val message = RemoteProtocolBuilder.getMessage(request)
- if (request.hasSender) {
- val sender = request.getSender
- if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtobuf(sender, applicationLoader)))
- } else {
+ val sender = if (request.hasSender) Some(ActorRef.fromProtobuf(request.getSender, applicationLoader))
+ else None
+ if (request.getIsOneWay) actorRef.!(message)(sender)
+ else {
try {
- val resultOrNone = actorRef !! message
+ val resultOrNone = actorRef.!!(message)(sender)
val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder
diff --git a/akka-core/src/main/scala/routing/Listeners.scala b/akka-core/src/main/scala/routing/Listeners.scala
index 4bfc3f30b6..7563e0800b 100644
--- a/akka-core/src/main/scala/routing/Listeners.scala
+++ b/akka-core/src/main/scala/routing/Listeners.scala
@@ -13,7 +13,7 @@ case class Listen(listener: ActorRef) extends ListenerMessage
case class Deafen(listener: ActorRef) extends ListenerMessage
case class WithListeners(f: List[ActorRef] => Unit) extends ListenerMessage
-/**
+/**
* Listeners is a generic trait to implement listening capability on an Actor.
*
* Use the gossip(msg) method to have it sent to the listeners.
@@ -34,6 +34,6 @@ trait Listeners { self: Actor =>
}
protected def gossip(msg: Any) = listenersAsList foreach (_ ! msg)
-
+
private def listenersAsList: List[ActorRef] = listeners.toArray.toList.asInstanceOf[List[ActorRef]]
}
diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala
index a7870cec93..cdcd4ad088 100644
--- a/akka-core/src/main/scala/stm/Transaction.scala
+++ b/akka-core/src/main/scala/stm/Transaction.scala
@@ -164,7 +164,7 @@ object Transaction {
}
/**
- * See ScalaDoc on Transaction.Local class.
+ * See ScalaDoc on Transaction.Local class.
*/
def atomically[A](firstBody: => A) = elseBody(firstBody)
@@ -282,10 +282,10 @@ object Transaction {
setTransaction(Some(tx))
mtx.registerLifecycleListener(new TransactionLifecycleListener() {
def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event.name match {
- case "postCommit" =>
+ case "postCommit" =>
log.trace("Committing transaction [%s]", mtx)
tx.commit
- case "postAbort" =>
+ case "postAbort" =>
log.trace("Aborting transaction [%s]", mtx)
tx.abort
case _ => {}
diff --git a/akka-core/src/test/scala/ActorPatternsTest.scala b/akka-core/src/test/scala/ActorPatternsTest.scala
index fac59a163c..0d4e9b6b08 100644
--- a/akka-core/src/test/scala/ActorPatternsTest.scala
+++ b/akka-core/src/test/scala/ActorPatternsTest.scala
@@ -21,14 +21,14 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4")
val targetOk = new AtomicInteger(0)
val t1 = actorOf( new Actor() {
- def receive = {
+ def receive = {
case `testMsg1` => self.reply(3)
case `testMsg2` => self.reply(7)
}
} ).start
val t2 = actorOf( new Actor() {
- def receive = {
+ def receive = {
case `testMsg3` => self.reply(11)
}
}).start
@@ -43,9 +43,9 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
b <- (d.!)
c <- (d.!)
} yield a + b + c
-
+
result.get must be(21)
- for(a <- List(t1,t2,d)) a.stop
+ for(a <- List(t1,t2,d)) a.stop
}
@Test def testLogger = {
diff --git a/akka-core/src/test/scala/StmSpec.scala b/akka-core/src/test/scala/StmSpec.scala
index 58b9b6805f..17d4be32bd 100644
--- a/akka-core/src/test/scala/StmSpec.scala
+++ b/akka-core/src/test/scala/StmSpec.scala
@@ -95,7 +95,7 @@ class StmSpec extends
val size2: Int = (actor !! Size).getOrElse(fail("Could not get Vector::size"))
size2 should equal(3)
} catch {
- case e =>
+ case e =>
e.printStackTrace
fail(e.toString)
}
@@ -122,7 +122,7 @@ class StmSpec extends
val size4: Int = (actor !! Size).getOrElse(fail("Could not get size"))
size4 should equal(3)
} catch {
- case e =>
+ case e =>
fail(e.toString)
}
}
@@ -130,7 +130,7 @@ class StmSpec extends
/*
describe("Multiverse API") {
it("should blablabla") {
-
+
import org.multiverse.api.programmatic._
// import org.multiverse.api._
import org.multiverse.templates._
@@ -139,13 +139,13 @@ class StmSpec extends
import org.multiverse.api.{GlobalStmInstance, ThreadLocalTransaction, Transaction => MultiverseTransaction}
import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.commitbarriers._
-
+
def createRef[T]: ProgrammaticReference[T] = GlobalStmInstance
.getGlobalStmInstance
.getProgrammaticReferenceFactoryBuilder
.build
.atomicCreateReference(null.asInstanceOf[T])
-
+
val ref1 = Ref(0)//createRef[Int]
val ref2 = Ref(0)//createRef[Int]
@@ -185,13 +185,13 @@ class GlobalTransactionVectorTestActor extends Actor {
import se.scalablesolutions.akka.stm.Transaction.Global
private val vector: TransactionalVector[Int] = Global.atomic { TransactionalVector(1) }
-
+
def receive = {
- case Add(value) =>
+ case Add(value) =>
Global.atomic { vector + value}
self.reply(Success)
- case Size =>
+ case Size =>
val size = Global.atomic { vector.size }
self.reply(size)
}
@@ -200,12 +200,12 @@ class GlobalTransactionVectorTestActor extends Actor {
class NestedTransactorLevelOneActor extends Actor {
import GlobalTransactionVectorTestActor._
private val nested = actorOf[NestedTransactorLevelTwoActor].start
-
+
def receive = {
- case add @ Add(_) =>
+ case add @ Add(_) =>
self.reply((nested !! add).get)
- case Size =>
+ case Size =>
self.reply((nested !! Size).get)
case "HiLevelOne" => println("HiLevelOne")
@@ -216,15 +216,15 @@ class NestedTransactorLevelOneActor extends Actor {
class NestedTransactorLevelTwoActor extends Actor {
import GlobalTransactionVectorTestActor._
private val ref = Ref(0)
-
+
def receive = {
- case Add(value) =>
+ case Add(value) =>
ref.swap(value)
self.reply(Success)
- case Size =>
+ case Size =>
self.reply(ref.getOrElse(-1))
-
+
case "HiLevelTwo" => println("HiLevelTwo")
}
}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/JerseyFoo.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/JerseyFoo.java
deleted file mode 100644
index 6828ba421f..0000000000
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/JerseyFoo.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package se.scalablesolutions.akka.api;
-
-import javax.ws.rs.Path;
-import javax.ws.rs.GET;
-import javax.ws.rs.Produces;
-
-@Path("/foo")
-public class JerseyFoo {
- @GET
- @Produces({"application/json"})
- public String foo() {
- return "hello foo";
- }
-}
\ No newline at end of file
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java
deleted file mode 100644
index 080c1cbd0b..0000000000
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package se.scalablesolutions.akka.api;
-
-public class PersistenceManager {
- private static volatile boolean isRunning = false;
- public static void init() {
- if (!isRunning) {
- se.scalablesolutions.akka.kernel.Kernel$.MODULE$.startRemoteService();
- isRunning = true;
- }
- }
-}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java
deleted file mode 100644
index d5c1bdf00c..0000000000
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package se.scalablesolutions.akka.api;
-
-import se.scalablesolutions.akka.persistence.common.*;
-import se.scalablesolutions.akka.persistence.cassandra.*;
-import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
-
-public class PersistentClasher {
- private PersistentMap state;
-
- @inittransactionalstate
- public void init() {
- state = CassandraStorage.newMap();
- }
-
- public String getState(String key) {
- return (String)state.get(key).get();
- }
-
- public void setState(String key, String msg) {
- state.put(key, msg);
- }
-
- public void clash() {
- state.put("clasher", "was here");
- }
-}
\ No newline at end of file
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentFailer.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentFailer.java
deleted file mode 100644
index 44cab7d6c3..0000000000
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentFailer.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package se.scalablesolutions.akka.api;
-
-public class PersistentFailer implements java.io.Serializable {
- public int fail() {
- throw new RuntimeException("expected");
- }
-}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
deleted file mode 100644
index 796d3d913a..0000000000
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-
-package se.scalablesolutions.akka.api;
-
-import se.scalablesolutions.akka.config.*;
-import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
-import static se.scalablesolutions.akka.config.JavaConfig.*;
-import se.scalablesolutions.akka.actor.*;
-import se.scalablesolutions.akka.kernel.Kernel;
-
-import junit.framework.TestCase;
-
-public class PersistentNestedStateTest extends TestCase {
- static String messageLog = "";
-
- final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
-
- protected void setUp() {
- PersistenceManager.init();
- conf.configure(
- new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}),
- new Component[]{
- new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 10000000),
- new Component(PersistentStatefulNested.class, new LifeCycle(new Permanent()), 10000000),
- new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000)
- //new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent()), 100000)
- }).inject().supervise();
- }
-
- protected void tearDown() {
- conf.stop();
- }
-
- public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() throws Exception {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
- stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
- nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
- assertEquals("new state", nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
- assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
- }
-
- public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
- PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
- nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
- PersistentFailer failer = conf.getInstance(PersistentFailer.class);
- try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
- fail("should have thrown an exception");
- } catch (RuntimeException e) {
- } // expected
- assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
- assertEquals("init", nested.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
- }
-
- public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- stateful.setVectorState("init"); // set init state
- PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
- nested.setVectorState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
- assertEquals(2, stateful.getVectorLength()); // BAD: keeps one element since last test
- assertEquals(2, nested.getVectorLength());
- }
-
- public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- stateful.setVectorState("init"); // set init state
- PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
- nested.setVectorState("init"); // set init state
- PersistentFailer failer = conf.getInstance(PersistentFailer.class);
- try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
- fail("should have thrown an exception");
- } catch (RuntimeException e) {
- } // expected
- assertEquals(1, stateful.getVectorLength());
- assertEquals(1, nested.getVectorLength());
- }
-
- public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
- stateful.setRefState("init"); // set init state
- nested.setRefState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
- assertEquals("new state", stateful.getRefState());
- assertEquals("new state", nested.getRefState());
- }
-
- public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
- stateful.setRefState("init"); // set init state
- nested.setRefState("init"); // set init state
- PersistentFailer failer = conf.getInstance(PersistentFailer.class);
- try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
- fail("should have thrown an exception");
- } catch (RuntimeException e) {
- } // expected
- assertEquals("init", stateful.getRefState()); // check that state is == init state
- assertEquals("init", nested.getRefState()); // check that state is == init state
- }
-}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
deleted file mode 100644
index 74f0afcf53..0000000000
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-
-package se.scalablesolutions.akka.api;
-
-import se.scalablesolutions.akka.config.*;
-import static se.scalablesolutions.akka.config.JavaConfig.*;
-
-import junit.framework.TestCase;
-
-public class PersistentStateTest extends TestCase {
- static String messageLog = "";
-
- final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
-
- protected void setUp() {
- PersistenceManager.init();
- conf.configure(
- new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}),
- new Component[] {
- new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 10000000),
- new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000)
- //new Component(PersistentClasher.class, new LifeCycle(new Permanent()), 100000)
- }).supervise();
- }
-
- protected void tearDown() {
- conf.stop();
- }
-
- public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
- assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
- }
-
- public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
- PersistentFailer failer = conf.getInstance(PersistentFailer.class);
- try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
- fail("should have thrown an exception");
- } catch (RuntimeException e) {
- } // expected
- assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
- }
-
- public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- stateful.setVectorState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
- assertEquals("init", stateful.getVectorState(0));
- assertEquals("new state", stateful.getVectorState(1));
- }
-
- public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- stateful.setVectorState("init"); // set init state
- PersistentFailer failer = conf.getInstance(PersistentFailer.class);
- try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
- fail("should have thrown an exception");
- } catch (RuntimeException e) {
- } // expected
- assertEquals("init", stateful.getVectorState(0)); // check that state is == init state
- }
-
- public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- stateful.setRefState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
- assertEquals("new state", stateful.getRefState());
- }
-
- public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- stateful.setRefState("init"); // set init state
- PersistentFailer failer = conf.getInstance(PersistentFailer.class);
- try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
- fail("should have thrown an exception");
- } catch (RuntimeException e) {
- } // expected
- assertEquals("init", stateful.getRefState()); // check that state is == init state
- }
-}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java
deleted file mode 100644
index 6a8d3353b7..0000000000
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package se.scalablesolutions.akka.api;
-
-import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
-import se.scalablesolutions.akka.actor.annotation.transactionrequired;
-import se.scalablesolutions.akka.persistence.common.*;
-import se.scalablesolutions.akka.persistence.cassandra.*;
-
-@transactionrequired
-public class PersistentStateful {
- private PersistentMap mapState;
- private PersistentVector vectorState;
- private PersistentRef refState;
-
- @inittransactionalstate
- public void init() {
- mapState = CassandraStorage.newMap();
- vectorState = CassandraStorage.newVector();
- refState = CassandraStorage.newRef();
- }
-
- public String getMapState(String key) {
- byte[] bytes = (byte[]) mapState.get(key.getBytes()).get();
- return new String(bytes, 0, bytes.length);
- }
-
- public String getVectorState(int index) {
- byte[] bytes = (byte[]) vectorState.get(index);
- return new String(bytes, 0, bytes.length);
- }
-
- public int getVectorLength() {
- return vectorState.length();
- }
-
- public String getRefState() {
- if (refState.isDefined()) {
- byte[] bytes = (byte[]) refState.get().get();
- return new String(bytes, 0, bytes.length);
- } else throw new IllegalStateException("No such element");
- }
-
- public void setMapState(String key, String msg) {
- mapState.put(key.getBytes(), msg.getBytes());
- }
-
- public void setVectorState(String msg) {
- vectorState.add(msg.getBytes());
- }
-
- public void setRefState(String msg) {
- refState.swap(msg.getBytes());
- }
-
- public void success(String key, String msg) {
- mapState.put(key.getBytes(), msg.getBytes());
- vectorState.add(msg.getBytes());
- refState.swap(msg.getBytes());
- }
-
- public String failure(String key, String msg, PersistentFailer failer) {
- mapState.put(key.getBytes(), msg.getBytes());
- vectorState.add(msg.getBytes());
- refState.swap(msg.getBytes());
- failer.fail();
- return msg;
- }
-
- public String success(String key, String msg, PersistentStatefulNested nested) {
- mapState.put(key.getBytes(), msg.getBytes());
- vectorState.add(msg.getBytes());
- refState.swap(msg.getBytes());
- nested.success(key, msg);
- return msg;
- }
-
- public String failure(String key, String msg, PersistentStatefulNested nested, PersistentFailer failer) {
- mapState.put(key.getBytes(), msg.getBytes());
- vectorState.add(msg.getBytes());
- refState.swap(msg.getBytes());
- nested.failure(key, msg, failer);
- return msg;
- }
-}
-
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java
deleted file mode 100644
index bd931ef108..0000000000
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package se.scalablesolutions.akka.api;
-
-import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
-import se.scalablesolutions.akka.actor.annotation.transactionrequired;
-import se.scalablesolutions.akka.persistence.common.*;
-import se.scalablesolutions.akka.persistence.cassandra.*;
-
-@transactionrequired
-public class PersistentStatefulNested {
- private PersistentMap mapState;
- private PersistentVector vectorState;
- private PersistentRef refState;
-
- @inittransactionalstate
- public void init() {
- mapState = CassandraStorage.newMap();
- vectorState = CassandraStorage.newVector();
- refState = CassandraStorage.newRef();
- }
-
- public String getMapState(String key) {
- byte[] bytes = (byte[]) mapState.get(key.getBytes()).get();
- return new String(bytes, 0, bytes.length);
- }
-
-
- public String getVectorState(int index) {
- byte[] bytes = (byte[]) vectorState.get(index);
- return new String(bytes, 0, bytes.length);
- }
-
- public int getVectorLength() {
- return vectorState.length();
- }
-
- public String getRefState() {
- if (refState.isDefined()) {
- byte[] bytes = (byte[]) refState.get().get();
- return new String(bytes, 0, bytes.length);
- } else throw new IllegalStateException("No such element");
- }
-
- public void setMapState(String key, String msg) {
- mapState.put(key.getBytes(), msg.getBytes());
- }
-
- public void setVectorState(String msg) {
- vectorState.add(msg.getBytes());
- }
-
- public void setRefState(String msg) {
- refState.swap(msg.getBytes());
- }
-
- public String success(String key, String msg) {
- mapState.put(key.getBytes(), msg.getBytes());
- vectorState.add(msg.getBytes());
- refState.swap(msg.getBytes());
- return msg;
- }
-
- public String failure(String key, String msg, PersistentFailer failer) {
- mapState.put(key.getBytes(), msg.getBytes());
- vectorState.add(msg.getBytes());
- refState.swap(msg.getBytes());
- failer.fail();
- return msg;
- }
-}
-
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufProtocol.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufProtocol.java
deleted file mode 100644
index e7b15ca1ef..0000000000
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufProtocol.java
+++ /dev/null
@@ -1,403 +0,0 @@
-// Generated by the protocol buffer compiler. DO NOT EDIT!
-
-package se.scalablesolutions.akka.api;
-
-public final class ProtobufProtocol {
- private ProtobufProtocol() {}
- public static void registerAllExtensions(
- com.google.protobuf.ExtensionRegistry registry) {
- }
- public static final class ProtobufPOJO extends
- com.google.protobuf.GeneratedMessage {
- // Use ProtobufPOJO.newBuilder() to construct.
- private ProtobufPOJO() {}
-
- private static final ProtobufPOJO defaultInstance = new ProtobufPOJO();
- public static ProtobufPOJO getDefaultInstance() {
- return defaultInstance;
- }
-
- public ProtobufPOJO getDefaultInstanceForType() {
- return defaultInstance;
- }
-
- public static final com.google.protobuf.Descriptors.Descriptor
- getDescriptor() {
- return se.scalablesolutions.akka.api.ProtobufProtocol.internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor;
- }
-
- protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
- internalGetFieldAccessorTable() {
- return se.scalablesolutions.akka.api.ProtobufProtocol.internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_fieldAccessorTable;
- }
-
- // required uint64 id = 1;
- public static final int ID_FIELD_NUMBER = 1;
- private boolean hasId;
- private long id_ = 0L;
- public boolean hasId() { return hasId; }
- public long getId() { return id_; }
-
- // required string name = 2;
- public static final int NAME_FIELD_NUMBER = 2;
- private boolean hasName;
- private java.lang.String name_ = "";
- public boolean hasName() { return hasName; }
- public java.lang.String getName() { return name_; }
-
- // required bool status = 3;
- public static final int STATUS_FIELD_NUMBER = 3;
- private boolean hasStatus;
- private boolean status_ = false;
- public boolean hasStatus() { return hasStatus; }
- public boolean getStatus() { return status_; }
-
- public final boolean isInitialized() {
- if (!hasId) return false;
- if (!hasName) return false;
- if (!hasStatus) return false;
- return true;
- }
-
- public void writeTo(com.google.protobuf.CodedOutputStream output)
- throws java.io.IOException {
- if (hasId()) {
- output.writeUInt64(1, getId());
- }
- if (hasName()) {
- output.writeString(2, getName());
- }
- if (hasStatus()) {
- output.writeBool(3, getStatus());
- }
- getUnknownFields().writeTo(output);
- }
-
- private int memoizedSerializedSize = -1;
- public int getSerializedSize() {
- int size = memoizedSerializedSize;
- if (size != -1) return size;
-
- size = 0;
- if (hasId()) {
- size += com.google.protobuf.CodedOutputStream
- .computeUInt64Size(1, getId());
- }
- if (hasName()) {
- size += com.google.protobuf.CodedOutputStream
- .computeStringSize(2, getName());
- }
- if (hasStatus()) {
- size += com.google.protobuf.CodedOutputStream
- .computeBoolSize(3, getStatus());
- }
- size += getUnknownFields().getSerializedSize();
- memoizedSerializedSize = size;
- return size;
- }
-
- public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
- com.google.protobuf.ByteString data)
- throws com.google.protobuf.InvalidProtocolBufferException {
- return newBuilder().mergeFrom(data).buildParsed();
- }
- public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
- com.google.protobuf.ByteString data,
- com.google.protobuf.ExtensionRegistryLite extensionRegistry)
- throws com.google.protobuf.InvalidProtocolBufferException {
- return newBuilder().mergeFrom(data, extensionRegistry)
- .buildParsed();
- }
- public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(byte[] data)
- throws com.google.protobuf.InvalidProtocolBufferException {
- return newBuilder().mergeFrom(data).buildParsed();
- }
- public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
- byte[] data,
- com.google.protobuf.ExtensionRegistryLite extensionRegistry)
- throws com.google.protobuf.InvalidProtocolBufferException {
- return newBuilder().mergeFrom(data, extensionRegistry)
- .buildParsed();
- }
- public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(java.io.InputStream input)
- throws java.io.IOException {
- return newBuilder().mergeFrom(input).buildParsed();
- }
- public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
- java.io.InputStream input,
- com.google.protobuf.ExtensionRegistryLite extensionRegistry)
- throws java.io.IOException {
- return newBuilder().mergeFrom(input, extensionRegistry)
- .buildParsed();
- }
- public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(java.io.InputStream input)
- throws java.io.IOException {
- return newBuilder().mergeDelimitedFrom(input).buildParsed();
- }
- public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(
- java.io.InputStream input,
- com.google.protobuf.ExtensionRegistryLite extensionRegistry)
- throws java.io.IOException {
- return newBuilder().mergeDelimitedFrom(input, extensionRegistry)
- .buildParsed();
- }
- public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
- com.google.protobuf.CodedInputStream input)
- throws java.io.IOException {
- return newBuilder().mergeFrom(input).buildParsed();
- }
- public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
- com.google.protobuf.CodedInputStream input,
- com.google.protobuf.ExtensionRegistryLite extensionRegistry)
- throws java.io.IOException {
- return newBuilder().mergeFrom(input, extensionRegistry)
- .buildParsed();
- }
-
- public static Builder newBuilder() { return Builder.create(); }
- public Builder newBuilderForType() { return newBuilder(); }
- public static Builder newBuilder(se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO prototype) {
- return newBuilder().mergeFrom(prototype);
- }
- public Builder toBuilder() { return newBuilder(this); }
-
- public static final class Builder extends
- com.google.protobuf.GeneratedMessage.Builder {
- private se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO result;
-
- // Construct using se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.newBuilder()
- private Builder() {}
-
- private static Builder create() {
- Builder builder = new Builder();
- builder.result = new se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO();
- return builder;
- }
-
- protected se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO internalGetResult() {
- return result;
- }
-
- public Builder clear() {
- if (result == null) {
- throw new IllegalStateException(
- "Cannot call clear() after build().");
- }
- result = new se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO();
- return this;
- }
-
- public Builder clone() {
- return create().mergeFrom(result);
- }
-
- public com.google.protobuf.Descriptors.Descriptor
- getDescriptorForType() {
- return se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.getDescriptor();
- }
-
- public se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO getDefaultInstanceForType() {
- return se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.getDefaultInstance();
- }
-
- public boolean isInitialized() {
- return result.isInitialized();
- }
- public se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO build() {
- if (result != null && !isInitialized()) {
- throw newUninitializedMessageException(result);
- }
- return buildPartial();
- }
-
- private se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO buildParsed()
- throws com.google.protobuf.InvalidProtocolBufferException {
- if (!isInitialized()) {
- throw newUninitializedMessageException(
- result).asInvalidProtocolBufferException();
- }
- return buildPartial();
- }
-
- public se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO buildPartial() {
- if (result == null) {
- throw new IllegalStateException(
- "build() has already been called on this Builder.");
- }
- se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO returnMe = result;
- result = null;
- return returnMe;
- }
-
- public Builder mergeFrom(com.google.protobuf.Message other) {
- if (other instanceof se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO) {
- return mergeFrom((se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO)other);
- } else {
- super.mergeFrom(other);
- return this;
- }
- }
-
- public Builder mergeFrom(se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO other) {
- if (other == se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.getDefaultInstance()) return this;
- if (other.hasId()) {
- setId(other.getId());
- }
- if (other.hasName()) {
- setName(other.getName());
- }
- if (other.hasStatus()) {
- setStatus(other.getStatus());
- }
- this.mergeUnknownFields(other.getUnknownFields());
- return this;
- }
-
- public Builder mergeFrom(
- com.google.protobuf.CodedInputStream input,
- com.google.protobuf.ExtensionRegistryLite extensionRegistry)
- throws java.io.IOException {
- com.google.protobuf.UnknownFieldSet.Builder unknownFields =
- com.google.protobuf.UnknownFieldSet.newBuilder(
- this.getUnknownFields());
- while (true) {
- int tag = input.readTag();
- switch (tag) {
- case 0:
- this.setUnknownFields(unknownFields.build());
- return this;
- default: {
- if (!parseUnknownField(input, unknownFields,
- extensionRegistry, tag)) {
- this.setUnknownFields(unknownFields.build());
- return this;
- }
- break;
- }
- case 8: {
- setId(input.readUInt64());
- break;
- }
- case 18: {
- setName(input.readString());
- break;
- }
- case 24: {
- setStatus(input.readBool());
- break;
- }
- }
- }
- }
-
-
- // required uint64 id = 1;
- public boolean hasId() {
- return result.hasId();
- }
- public long getId() {
- return result.getId();
- }
- public Builder setId(long value) {
- result.hasId = true;
- result.id_ = value;
- return this;
- }
- public Builder clearId() {
- result.hasId = false;
- result.id_ = 0L;
- return this;
- }
-
- // required string name = 2;
- public boolean hasName() {
- return result.hasName();
- }
- public java.lang.String getName() {
- return result.getName();
- }
- public Builder setName(java.lang.String value) {
- if (value == null) {
- throw new NullPointerException();
- }
- result.hasName = true;
- result.name_ = value;
- return this;
- }
- public Builder clearName() {
- result.hasName = false;
- result.name_ = getDefaultInstance().getName();
- return this;
- }
-
- // required bool status = 3;
- public boolean hasStatus() {
- return result.hasStatus();
- }
- public boolean getStatus() {
- return result.getStatus();
- }
- public Builder setStatus(boolean value) {
- result.hasStatus = true;
- result.status_ = value;
- return this;
- }
- public Builder clearStatus() {
- result.hasStatus = false;
- result.status_ = false;
- return this;
- }
- }
-
- static {
- se.scalablesolutions.akka.api.ProtobufProtocol.getDescriptor();
- }
-
- static {
- se.scalablesolutions.akka.api.ProtobufProtocol.internalForceInit();
- }
- }
-
- private static com.google.protobuf.Descriptors.Descriptor
- internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor;
- private static
- com.google.protobuf.GeneratedMessage.FieldAccessorTable
- internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_fieldAccessorTable;
-
- public static com.google.protobuf.Descriptors.FileDescriptor
- getDescriptor() {
- return descriptor;
- }
- private static com.google.protobuf.Descriptors.FileDescriptor
- descriptor;
- static {
- java.lang.String[] descriptorData = {
- "\n4se/scalablesolutions/akka/api/Protobuf" +
- "Protocol.proto\022\035se.scalablesolutions.akk" +
- "a.api\"8\n\014ProtobufPOJO\022\n\n\002id\030\001 \002(\004\022\014\n\004nam" +
- "e\030\002 \002(\t\022\016\n\006status\030\003 \002(\010"
- };
- com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
- new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
- public com.google.protobuf.ExtensionRegistry assignDescriptors(
- com.google.protobuf.Descriptors.FileDescriptor root) {
- descriptor = root;
- internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor =
- getDescriptor().getMessageTypes().get(0);
- internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_fieldAccessorTable = new
- com.google.protobuf.GeneratedMessage.FieldAccessorTable(
- internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor,
- new java.lang.String[] { "Id", "Name", "Status", },
- se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.class,
- se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.Builder.class);
- return null;
- }
- };
- com.google.protobuf.Descriptors.FileDescriptor
- .internalBuildGeneratedFileFrom(descriptorData,
- new com.google.protobuf.Descriptors.FileDescriptor[] {
- }, assigner);
- }
-
- public static void internalForceInit() {}
-}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufProtocol.proto b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufProtocol.proto
deleted file mode 100644
index 8140ad313a..0000000000
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufProtocol.proto
+++ /dev/null
@@ -1,17 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-
-package se.scalablesolutions.akka.api;
-
-/*
- Compile with:
- cd ./akka-fun-test-java/src/test/java
- protoc se/scalablesolutions/akka/api/ProtobufProtocol.proto --java_out .
-*/
-
-message ProtobufPOJO {
- required uint64 id = 1;
- required string name = 2;
- required bool status = 3;
-}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufSerializationTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufSerializationTest.java
deleted file mode 100644
index a67560146d..0000000000
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufSerializationTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-
-package se.scalablesolutions.akka.api;
-
-import junit.framework.TestCase;
-import se.scalablesolutions.akka.serialization.SerializerFactory;
-
-public class ProtobufSerializationTest extends TestCase {
- public void testOutIn() throws Exception {
- SerializerFactory factory = new SerializerFactory();
- ProtobufProtocol.ProtobufPOJO pojo1 = ProtobufProtocol.ProtobufPOJO.getDefaultInstance().toBuilder().setId(1).setName("protobuf").setStatus(true).build();
-
- byte[] bytes = factory.getProtobuf().out(pojo1);
- Object obj = factory.getProtobuf().in(bytes, pojo1.getClass());
-
- assertTrue(obj instanceof ProtobufProtocol.ProtobufPOJO);
- ProtobufProtocol.ProtobufPOJO pojo2 = (ProtobufProtocol.ProtobufPOJO)obj;
- assertEquals(pojo1.getId(), pojo2.getId());
- assertEquals(pojo1.getName(), pojo2.getName());
- assertEquals(pojo1.getStatus(), pojo2.getStatus());
- }
-
- public void testDeepClone() throws Exception {
- SerializerFactory factory = new SerializerFactory();
- ProtobufProtocol.ProtobufPOJO pojo1 = ProtobufProtocol.ProtobufPOJO.getDefaultInstance().toBuilder().setId(1).setName("protobuf").setStatus(true).build();
-
- Object obj = factory.getProtobuf().deepClone(pojo1);
-
- assertTrue(obj instanceof ProtobufProtocol.ProtobufPOJO);
- ProtobufProtocol.ProtobufPOJO pojo2 = (ProtobufProtocol.ProtobufPOJO)obj;
- assertEquals(pojo1.getId(), pojo2.getId());
- assertEquals(pojo1.getName(), pojo2.getName());
- assertEquals(pojo1.getStatus(), pojo2.getStatus());
- }
-}
-
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
deleted file mode 100644
index 18c3b1ed62..0000000000
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-
-package se.scalablesolutions.akka.api;
-
-import se.scalablesolutions.akka.config.*;
-import static se.scalablesolutions.akka.config.JavaConfig.*;
-
-import junit.framework.TestCase;
-
-public class RemotePersistentStateTest extends TestCase {
- static String messageLog = "";
-
- final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
-
- protected void setUp() {
- PersistenceManager.init();
- conf.configure(
- new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}),
- new Component[] {
- new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 1000000, new RemoteAddress("localhost", 9999)),
- new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000000, new RemoteAddress("localhost", 9999))
- }).supervise();
- }
-
- protected void tearDown() {
- conf.stop();
- }
-
- public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
- assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
- }
-
- public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
- PersistentFailer failer = conf.getInstance(PersistentFailer.class);
- try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "MapShouldRollBack", failer); // call failing transactionrequired method
- fail("should have thrown an exception");
- } catch (RuntimeException e) {
- } // expected
- assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
- }
-
- public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- int init = stateful.getVectorLength();
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "VectorShouldNotRollback"); // transactionrequired
- assertEquals(init + 1, stateful.getVectorLength());
- }
-
- public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- int init = stateful.getVectorLength();
- PersistentFailer failer = conf.getInstance(PersistentFailer.class);
- try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
- fail("should have thrown an exception");
- } catch (RuntimeException e) {
- } // expected
- assertEquals(init, stateful.getVectorLength());
- }
-
- public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- stateful.setRefState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
- assertEquals("new state", stateful.getRefState());
- }
-
- public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
- PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
- stateful.setRefState("init"); // set init state
- PersistentFailer failer = conf.getInstance(PersistentFailer.class);
- try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
- fail("should have thrown an exception");
- } catch (RuntimeException e) {
- } // expected
- assertEquals("init", stateful.getRefState()); // check that state is == init state
- }
-
-}
\ No newline at end of file
diff --git a/akka-fun-test-java/testng.xml b/akka-fun-test-java/testng.xml
deleted file mode 100644
index b894d3880b..0000000000
--- a/akka-fun-test-java/testng.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/akka-http/src/main/scala/AkkaLoader.scala b/akka-http/src/main/scala/AkkaLoader.scala
index 431758e2c7..886c2c0688 100644
--- a/akka-http/src/main/scala/AkkaLoader.scala
+++ b/akka-http/src/main/scala/AkkaLoader.scala
@@ -47,29 +47,29 @@ class AkkaLoader extends Logging {
private def printBanner = {
log.info(
"""
- t
- t t t
- t t tt t
- tt t t tt t
- t ttttttt t ttt t
- t tt ttt t ttt t
- t t ttt t ttt t t
- tt t ttt ttt ttt t
- t t ttt ttt t tt t
- t ttt ttt t t
- tt ttt ttt t
- ttt ttt
- tttttttt ttt ttt ttt ttt tttttttt
- ttt tt ttt ttt ttt ttt ttt ttt
- ttt ttt ttt ttt ttt ttt ttt ttt
- ttt ttt ttt ttt ttt tt ttt ttt
- tttt ttttttttt tttttttt tttt
- ttttttttt ttt ttt ttt ttt ttttttttt
- ttt ttt ttt ttt ttt ttt ttt ttt
- ttt ttt ttt ttt ttt ttt ttt ttt
- ttt tt ttt ttt ttt ttt ttt ttt
+ t
+ t t t
+ t t tt t
+ tt t t tt t
+ t ttttttt t ttt t
+ t tt ttt t ttt t
+ t t ttt t ttt t t
+ tt t ttt ttt ttt t
+ t t ttt ttt t tt t
+ t ttt ttt t t
+ tt ttt ttt t
+ ttt ttt
tttttttt ttt ttt ttt ttt tttttttt
-
+ ttt tt ttt ttt ttt ttt ttt ttt
+ ttt ttt ttt ttt ttt ttt ttt ttt
+ ttt ttt ttt ttt ttt tt ttt ttt
+ tttt ttttttttt tttttttt tttt
+ ttttttttt ttt ttt ttt ttt ttttttttt
+ ttt ttt ttt ttt ttt ttt ttt ttt
+ ttt ttt ttt ttt ttt ttt ttt ttt
+ ttt tt ttt ttt ttt ttt ttt ttt
+ tttttttt ttt ttt ttt ttt tttttttt
+
==================================================
""")
log.info(" Running version %s", Config.VERSION)
diff --git a/akka-kernel/src/main/scala/EmbeddedAppServer.scala b/akka-kernel/src/main/scala/EmbeddedAppServer.scala
index 41e6dd44ae..8d9982c7e2 100644
--- a/akka-kernel/src/main/scala/EmbeddedAppServer.scala
+++ b/akka-kernel/src/main/scala/EmbeddedAppServer.scala
@@ -50,14 +50,14 @@ trait EmbeddedAppServer extends Bootable with Logging {
Thread.currentThread.setContextClassLoader(applicationLoader.get)
super.init(sc)
}
- finally {
+ finally {
Thread.currentThread.setContextClassLoader(cl)
}
}
})
adapter.setContextPath(uri.getPath)
- adapter.addInitParameter("cometSupport",
+ adapter.addInitParameter("cometSupport",
"org.atmosphere.container.GrizzlyCometSupport")
adapter.addInitParameter("com.sun.jersey.config.property.resourceConfigClass",
"com.sun.jersey.api.core.PackagesResourceConfig")
@@ -65,7 +65,7 @@ trait EmbeddedAppServer extends Bootable with Logging {
config.getList("akka.rest.resource_packages").mkString(";")
)
adapter.addInitParameter("com.sun.jersey.spi.container.ResourceFilters",
- config.getList("akka.rest.filters").mkString(",")
+ config.getList("akka.rest.filters").mkString(",")
)
if (HOME.isDefined) adapter.addRootFolder(HOME.get + "/deploy/root")
diff --git a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala
index b67a0e9256..f393f4a162 100644
--- a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala
+++ b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala
@@ -42,7 +42,7 @@ private[akka] object CassandraStorageBackend extends
case "ALL" => ConsistencyLevel.ALL
case "ANY" => ConsistencyLevel.ANY
case unknown => throw new IllegalArgumentException(
- "Cassandra consistency level [" + unknown + "] is not supported." +
+ "Cassandra consistency level [" + unknown + "] is not supported." +
"\n\tExpected one of [ZERO, ONE, QUORUM, DCQUORUM, DCQUORUMSYNC, ALL, ANY] in the akka.conf configuration file.")
}
}
@@ -105,9 +105,9 @@ private[akka] object CassandraStorageBackend extends
}
}
- def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) =
+ def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) =
elements.foreach(insertVectorStorageEntryFor(name, _))
-
+
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
val columnPath = new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family)
columnPath.setColumn(intToBytes(index))
diff --git a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala
index 6746fca529..648dec2be3 100644
--- a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala
@@ -171,4 +171,4 @@ object EmbeddedCassandraService {
def start: Unit = {}
}
-*/
\ No newline at end of file
+*/
diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
index bcfb2c1025..6a0eb9a8d8 100644
--- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
+++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
@@ -30,7 +30,7 @@ class StorageException(message: String) extends RuntimeException(message)
*