diff --git a/.gitignore b/.gitignore index c4a1e0a3be..ef2fb469bb 100755 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,8 @@ *~ *# src_managed -project/plugins/project/ +activemq-data +project/plugins/project project/boot/* */project/build/target */project/boot diff --git a/akka-fun-test-java/pom.xml b/akka-active-object-test/pom.xml similarity index 54% rename from akka-fun-test-java/pom.xml rename to akka-active-object-test/pom.xml index c8ddef3320..62935a30b4 100644 --- a/akka-fun-test-java/pom.xml +++ b/akka-active-object-test/pom.xml @@ -3,24 +3,28 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 - Akka Functional Tests in Java - akka-fun-test-java + Akka Active Object Tests in Java + akka-active-object-test se.scalablesolutions.akka 0.9 jar - 2.8.0.RC2 - 0.5.2 - 1.1.5 - 1.9.18-i + 2.8.0.RC3 embedded-repo Embedded Repository - file://Users/jboner/src/scala/akka/embedded-repo + file:///Users/jboner/src/scala/akka/embedded-repo + + + + jboss + JBoss Repository + https://repository.jboss.org/nexus/content/groups/public + @@ -35,59 +39,9 @@ se.scalablesolutions.akka - akka-kernel_2.8.0.RC2 + akka-core_2.8.0.RC3 0.9 - - se.scalablesolutions.akka - akka-persistence-cassandra_2.8.0.RC2 - 0.9 - - - com.google.protobuf - protobuf-java - 2.2.0 - - - org.codehaus.jackson - jackson-core-asl - 1.2.1 - - - org.codehaus.jackson - jackson-mapper-asl - 1.2.1 - - - com.sun.grizzly - grizzly-servlet-webserver - ${grizzly.version} - test - - - com.sun.jersey - jersey-server - ${jersey.version} - test - - - com.sun.jersey - jersey-json - ${jersey.version} - test - - - com.sun.jersey - jersey-client - ${jersey.version} - test - - - com.sun.jersey - jersey-atom - ${jersey.version} - test - junit junit diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java similarity index 100% rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTest.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/AllTest.java similarity index 67% rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTest.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/AllTest.java index 2e724f0324..465c9da182 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTest.java +++ b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/AllTest.java @@ -8,13 +8,9 @@ public class AllTest extends TestCase { public static Test suite() { TestSuite suite = new TestSuite("All Java tests"); suite.addTestSuite(InMemoryStateTest.class); - //suite.addTestSuite(InMemNestedStateTest.class); + suite.addTestSuite(InMemNestedStateTest.class); suite.addTestSuite(RemoteInMemoryStateTest.class); suite.addTestSuite(ActiveObjectGuiceConfiguratorTest.class); - //suite.addTestSuite(PersistentStateTest.class); - //suite.addTestSuite(PersistentNestedStateTest.class); - //suite.addTestSuite(RemotePersistentStateTest.class); - //suite.addTestSuite(RestTest.class); return suite; } diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Bar.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/Bar.java similarity index 100% rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Bar.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/Bar.java diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/BarImpl.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/BarImpl.java similarity index 100% rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/BarImpl.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/BarImpl.java diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Ext.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/Ext.java similarity index 100% rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Ext.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/Ext.java diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ExtImpl.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/ExtImpl.java similarity index 100% rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ExtImpl.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/ExtImpl.java diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Foo.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/Foo.java similarity index 100% rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Foo.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/Foo.java diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java similarity index 100% rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java similarity index 99% rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java index 992c188fa1..746df950bf 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java +++ b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java @@ -9,7 +9,6 @@ import se.scalablesolutions.akka.config.Config; import se.scalablesolutions.akka.config.ActiveObjectConfigurator; import static se.scalablesolutions.akka.config.JavaConfig.*; import se.scalablesolutions.akka.actor.*; -import se.scalablesolutions.akka.kernel.Kernel; import junit.framework.TestCase; public class InMemNestedStateTest extends TestCase { diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java similarity index 100% rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java similarity index 100% rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java similarity index 99% rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java index 740bfd892c..3708d58acc 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java +++ b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java @@ -13,7 +13,6 @@ import se.scalablesolutions.akka.config.ActiveObjectConfigurator; import static se.scalablesolutions.akka.config.JavaConfig.*; import se.scalablesolutions.akka.actor.*; -import se.scalablesolutions.akka.kernel.Kernel; public class InMemoryStateTest extends TestCase { static String messageLog = ""; diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java b/akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java similarity index 100% rename from akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java rename to akka-active-object-test/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 2d83cf2ba4..a2d2820cca 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -66,23 +66,23 @@ final class ActiveObjectConfiguration { } /** - * Holds RTTI (runtime type information) for the Active Object, f.e. current 'sender' + * Holds RTTI (runtime type information) for the Active Object, f.e. current 'sender' * reference, the 'senderFuture' reference etc. *

- * In order to make use of this context you have to create a member field in your - * Active Object that has the type 'ActiveObjectContext', then an instance will - * be injected for you to use. + * In order to make use of this context you have to create a member field in your + * Active Object that has the type 'ActiveObjectContext', then an instance will + * be injected for you to use. *

- * This class does not contain static information but is updated by the runtime system - * at runtime. + * This class does not contain static information but is updated by the runtime system + * at runtime. *

- * Here is an example of usage: + * Here is an example of usage: *

  * class Ping {
- *   // This context will be injected, holds RTTI (runtime type information) 
- *   // for the current message send 
+ *   // This context will be injected, holds RTTI (runtime type information)
+ *   // for the current message send
  *   private ActiveObjectContext context = null;
- *   
+ *
  *   public void hit(int count) {
  *     Pong pong = (Pong) context.getSender();
  *     pong.hit(count++)
@@ -100,19 +100,19 @@ final class ActiveObjectContext {
    * Returns the current sender Active Object reference.
    * Scala style getter.
    */
-  def sender: AnyRef = { 
+  def sender: AnyRef = {
     if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.")
     else _sender
-  } 
+  }
 
   /**
    * Returns the current sender Active Object reference.
    * Java style getter.
    */
-   def getSender: AnyRef = { 
+   def getSender: AnyRef = {
      if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.")
      else _sender
-   } 
+   }
 
   /**
    * Returns the current sender future Active Object reference.
@@ -364,7 +364,7 @@ object ActiveObject extends Logging {
     proxy.asInstanceOf[T]
   }
 
-  private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef, 
+  private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef,
                                    remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
     val context = injectActiveObjectContext(target)
     val proxy = Proxy.newInstance(Array(intf), Array(target), false, false)
@@ -462,7 +462,7 @@ object ActiveObject extends Logging {
         if (parent != null) injectActiveObjectContext0(activeObject, parent)
         else {
           log.warning(
-          "Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.", 
+          "Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.",
           activeObject.getClass.getName)
           None
         }
@@ -529,7 +529,7 @@ private[akka] sealed class ActiveObjectAspect {
       remoteAddress = init.remoteAddress
       timeout = init.timeout
       isInitialized = true
-      
+
     }
     dispatch(joinPoint)
   }
@@ -590,7 +590,7 @@ private[akka] sealed class ActiveObjectAspect {
     } else future.result
 
   private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE
-  
+
   private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = {
     var isEscaped = false
     val escapedArgs = for (arg <- args) yield {
@@ -613,11 +613,11 @@ private[akka] sealed class ActiveObjectAspect {
   joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean, sender: AnyRef, senderFuture: CompletableFuture[Any]) {
 
   override def toString: String = synchronized {
-    "Invocation [joinPoint: " + joinPoint.toString + 
-    ", isOneWay: " + isOneWay + 
+    "Invocation [joinPoint: " + joinPoint.toString +
+    ", isOneWay: " + isOneWay +
     ", isVoid: " + isVoid +
-    ", sender: " + sender + 
-    ", senderFuture: " + senderFuture + 
+    ", sender: " + sender +
+    ", senderFuture: " + senderFuture +
     "]"
   }
 
@@ -660,11 +660,11 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
   private var postRestart: Option[Method] = None
   private var initTxState: Option[Method] = None
   private var context: Option[ActiveObjectContext] = None
-  
+
   def this(transactionalRequired: Boolean) = this(transactionalRequired,None)
 
   private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef, ctx: Option[ActiveObjectContext]) = {
-    if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired)) 
+    if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired))
       self.makeTransactionRequired
     self.id = targetClass.getName
     target = Some(targetInstance)
@@ -712,7 +712,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
 
   def receive = {
     case Invocation(joinPoint, isOneWay, _, sender, senderFuture) =>
-      context.foreach { ctx => 
+      context.foreach { ctx =>
         if (sender ne null) ctx._sender = sender
         if (senderFuture ne null) ctx._senderFuture = senderFuture
       }
diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala
index 2a9e70bde9..3b7d62f97f 100644
--- a/akka-core/src/main/scala/actor/Actor.scala
+++ b/akka-core/src/main/scala/actor/Actor.scala
@@ -295,8 +295,10 @@ trait Actor extends Logging {
   type Receive = Actor.Receive
 
    /*
-    * For internal use only, functions as the implicit sender references when invoking
-    * one of the message send functions (!, !! and !!!).
+    * Option[ActorRef] representation of the 'self' ActorRef reference.
+    * 

+ * Mainly for internal use, functions as the implicit sender references when invoking + * one of the message send functions ('!', '!!' and '!!!'). */ implicit val optionSelf: Option[ActorRef] = { val ref = Actor.actorRefInCreation.value @@ -313,8 +315,10 @@ trait Actor extends Logging { } /* - * For internal use only, functions as the implicit sender references when invoking - * the forward function. + * Some[ActorRef] representation of the 'self' ActorRef reference. + *

+ * Mainly for internal use, functions as the implicit sender references when invoking + * the 'forward' function. */ implicit val someSelf: Some[ActorRef] = optionSelf.asInstanceOf[Some[ActorRef]] @@ -325,9 +329,31 @@ trait Actor extends Logging { *

    * self ! message
    * 
+ * Here you also find most of the Actor API. + *

+ * For example fields like: + *

+   * self.dispactcher = ...
+   * self.trapExit = ...
+   * self.faultHandler = ...
+   * self.lifeCycle = ...
+   * self.sender
+   * 
+ *

+ * Here you also find methods like: + *

+   * self.reply(..)
+   * self.link(..)
+   * self.unlink(..)
+   * self.start(..)
+   * self.stop(..)
+   * 
*/ - val self: ActorRef = optionSelf.get - self.id = getClass.getName + val self: ActorRef = { + val zelf = optionSelf.get + zelf.id = getClass.getName + zelf + } /** * User overridable callback/setting. @@ -339,64 +365,64 @@ trait Actor extends Logging { *
    *   def receive = {
    *     case Ping =>
-   *       println("got a ping")
+   *       log.info("got a 'Ping' message")
    *       self.reply("pong")
    *
    *     case OneWay =>
-   *       println("got a oneway")
+   *       log.info("got a 'OneWay' message")
    *
-   *     case _ =>
-   *       println("unknown message, ignoring")
+   *     case unknown =>
+   *       log.warning("unknown message [%s], ignoring", unknown)
    * }
    * 
*/ protected def receive: Receive /** - * User overridable callback/setting. + * User overridable callback. *

- * Optional callback method that is called during initialization. - * To be implemented by subclassing actor. + * Is called when an Actor is started by invoking 'actor.start'. */ def init {} /** - * User overridable callback/setting. + * User overridable callback. *

- * Mandatory callback method that is called during restart and reinitialization after a server crash. - * To be implemented by subclassing actor. + * Is called when 'actor.stop' is invoked. + */ + def shutdown {} + + /** + * User overridable callback. + *

+ * Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated. */ def preRestart(reason: Throwable) {} /** - * User overridable callback/setting. + * User overridable callback. *

- * Mandatory callback method that is called during restart and reinitialization after a server crash. - * To be implemented by subclassing actor. + * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. */ def postRestart(reason: Throwable) {} /** - * User overridable callback/setting. + * User overridable callback. *

- * Optional callback method that is called during termination. - * To be implemented by subclassing actor. + * Is called during initialization. Can be used to initialize transactional state. Will be invoked within a transaction. */ def initTransactionalState {} - /** - * User overridable callback/setting. - *

- * Optional callback method that is called during termination. - * To be implemented by subclassing actor. - */ - def shutdown {} - // ========================================= // ==== INTERNAL IMPLEMENTATION DETAILS ==== // ========================================= - private[akka] def base: Receive = lifeCycles orElse (self.hotswap getOrElse receive) + private[akka] def base: Receive = try { + lifeCycles orElse (self.hotswap getOrElse receive) + } catch { + case e: NullPointerException => throw new IllegalStateException( + "The 'self' ActorRef reference for [" + getClass.getName + "] is NULL, error in the ActorRef initialization process.") + } private val lifeCycles: Receive = { case HotSwap(code) => self.hotswap = code @@ -414,39 +440,3 @@ trait Actor extends Logging { override def toString = self.toString } - -/** - * Base class for the different dispatcher types. - * - * @author Jonas Bonér - */ -sealed abstract class DispatcherType - -/** - * Module that holds the different dispatcher types. - * - * @author Jonas Bonér - */ -object DispatcherType { - case object EventBasedThreadPooledProxyInvokingDispatcher extends DispatcherType - case object EventBasedSingleThreadDispatcher extends DispatcherType - case object EventBasedThreadPoolDispatcher extends DispatcherType - case object ThreadBasedDispatcher extends DispatcherType -} - -/** - * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': - * http://en.wikipedia.org/wiki/Actor_model - *

- * An actor has a well-defined (non-cyclic) life-cycle. - *

- * => NEW (newly created actor) - can't receive messages (yet)
- *     => STARTED (when 'start' is invoked) - can receive messages
- *         => SHUT DOWN (when 'exit' is invoked) - can't do anything
- * 
- * - * @author Jonas Bonér - */ -class ActorMessageInvoker private[akka] (val actorRef: ActorRef) extends MessageInvoker { - def invoke(handle: MessageInvocation) = actorRef.invoke(handle) -} diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index ec96b87bf2..682e85655c 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -40,7 +40,7 @@ import java.lang.reflect.Field *

* Protobuf Message -> ActorRef: *

- *   val actorRef = ActorRef.fromProtocol(protobufMessage)
+ *   val actorRef = ActorRef.fromProtobuf(protobufMessage)
  *   actorRef ! message // send message to remote actor through its reference
  * 
* @author Jonas Bonér @@ -51,15 +51,15 @@ object ActorRef { * Deserializes the ActorRef instance from a byte array (Array[Byte]) into an ActorRef instance. */ def fromBinary(bytes: Array[Byte]): ActorRef = - fromProtocol(ActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) + fromProtobuf(ActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) def fromBinary(bytes: Array[Byte], loader: ClassLoader): ActorRef = - fromProtocol(ActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) + fromProtobuf(ActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) /** * Deserializes the ActorRef instance from a Protocol Buffers (protobuf) Message into an ActorRef instance. */ - private[akka] def fromProtocol(protocol: ActorRefProtocol, loader: Option[ClassLoader]): ActorRef = + private[akka] def fromProtobuf(protocol: ActorRefProtocol, loader: Option[ClassLoader]): ActorRef = RemoteActorRef( protocol.getUuid, protocol.getActorClassName, @@ -222,7 +222,7 @@ trait ActorRef extends TransactionManagement { * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture } - + /** * Is the actor being restarted? */ @@ -356,7 +356,7 @@ trait ActorRef extends TransactionManagement { "\n\tYou have probably: " + "\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." + "\n\t\t2. Invoked a method on an Active Object from an instance NOT an Active Object.") - + /** * Use reply_?(..) to reply with a message to the original sender of the message currently * being processed. @@ -527,7 +527,7 @@ trait ActorRef extends TransactionManagement { */ def shutdownLinkedActors: Unit - protected[akka] def toProtocol: ActorRefProtocol + protected[akka] def toProtobuf: ActorRefProtocol protected[akka] def invoke(messageHandle: MessageInvocation): Unit @@ -572,7 +572,7 @@ trait ActorRef extends TransactionManagement { protected def processSender(senderOption: Option[ActorRef], requestBuilder: RemoteRequestProtocol.Builder) = { senderOption.foreach { sender => RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender) - requestBuilder.setSender(sender.toProtocol) + requestBuilder.setSender(sender.toProtobuf) } } } @@ -595,7 +595,7 @@ sealed class LocalActorRef private[akka]( @volatile private[akka] var _supervisor: Option[ActorRef] = None protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] - protected[this] val actorInstance = new AtomicReference[Actor](newActor) + protected[this] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } @volatile private var isInInitialization = false @volatile private var runActorInitialization = false @@ -609,7 +609,7 @@ sealed class LocalActorRef private[akka]( /** * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. */ - protected[akka] def toProtocol: ActorRefProtocol = guard.withGuard { + protected[akka] def toProtobuf: ActorRefProtocol = guard.withGuard { val host = homeAddress.getHostName val port = homeAddress.getPort @@ -637,7 +637,7 @@ sealed class LocalActorRef private[akka]( /** * Serializes the ActorRef instance into a byte array (Array[Byte]). */ - def toBinary: Array[Byte] = toProtocol.toByteArray + def toBinary: Array[Byte] = toProtobuf.toByteArray /** * Returns the class for the Actor instance that is managed by the ActorRef. @@ -947,7 +947,7 @@ sealed class LocalActorRef private[akka]( .setIsOneWay(false) .setIsEscaped(false) - //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) + //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtobuf)) RemoteProtocolBuilder.setMessage(message, requestBuilder) val id = registerSupervisorAsRemoteActor @@ -1224,7 +1224,7 @@ private[akka] case class RemoteActorRef private[akka] ( extends ActorRef { _uuid = uuuid timeout = _timeout - + start lazy val remoteClient = RemoteClient.clientFor(hostname, port, loader) @@ -1292,7 +1292,7 @@ private[akka] case class RemoteActorRef private[akka] ( def mailboxSize: Int = unsupported def supervisor: Option[ActorRef] = unsupported def shutdownLinkedActors: Unit = unsupported - protected[akka] def toProtocol: ActorRefProtocol = unsupported + protected[akka] def toProtobuf: ActorRefProtocol = unsupported protected[akka] def mailbox: Deque[MessageInvocation] = unsupported protected[akka] def restart(reason: Throwable): Unit = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index 5943985b57..b9827fdb9c 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -63,6 +63,12 @@ object ActorRegistry extends ListenerManagement { all.toList } + /** + * Finds any actor that matches T. + */ + def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] = + actorsFor[T](manifest).headOption + /** * Finds all actors of the exact type specified by the class passed in as the Class argument. */ diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala index e65a5a5b74..b800c94f23 100644 --- a/akka-core/src/main/scala/actor/Agent.scala +++ b/akka-core/src/main/scala/actor/Agent.scala @@ -88,7 +88,8 @@ class AgentException private[akka](message: String) extends RuntimeException(mes *
*

* -* IMPORTANT: +* IMPORTANT: +*

* You can *not* call 'agent.get', 'agent()' or use the monadic 'foreach', * 'map' and 'flatMap' within an enclosing transaction since that would block * the transaction indefinitely. But all other operations are fine. The system @@ -103,7 +104,6 @@ sealed class Agent[T] private (initialValue: T) { import Actor._ private val dispatcher = actorOf(new AgentDispatcher[T](initialValue)).start - dispatcher ! Value(initialValue) /** * Submits a request to read the internal state. @@ -215,7 +215,7 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto import Actor._ log.debug("Starting up Agent [%s]", self.uuid) - private lazy val value = Ref[T]() + private val value = Ref[T](initialValue) /** * Periodically handles incoming messages. @@ -233,6 +233,5 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto * Performs a CAS operation, atomically swapping the internal state with the value * provided as a by-name parameter. */ - private final def swap(newData: => T): Unit = value.swap(newData) + private def swap(newData: => T): Unit = value.swap(newData) } - diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index c0cb14118a..5607c17e96 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -47,46 +47,6 @@ object Supervisor { def apply(config: SupervisorConfig): Supervisor = SupervisorFactory(config).newInstance.start } -/** - * Factory object for creating supervisors as Actors, it has both a declarative and programatic API. - *

- * - * Here is a sample on how to use the programmatic API (note that the supervisor is automatically started): - *

- * val supervisor = SupervisorActor(AllForOneStrategy(maxNrOfRetries, timeRange), Array(classOf[Throwable]))
- *
- * Here is a sample on how to use the declarative API:
- * 
- *  val supervisor = SupervisorActor(
- *    SupervisorConfig(
- *      RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
- *      Supervise(
- *        myFirstActor,
- *        LifeCycle(Permanent)) ::
- *      Supervise(
- *        mySecondActor,
- *        LifeCycle(Permanent)) ::
- *      Nil))
- * 
- * - * You dynamically link and unlink child children using the 'link' and 'unlink' methods. - *
- * supervisor.link(child)
- * supervisor.unlink(child)
- * 
- * - * @author Jonas Bonér - */ -object SupervisorActor { - def apply(config: SupervisorConfig): ActorRef = { - val (handler, trapExits) = SupervisorFactory.retrieveFaultHandlerAndTrapExitsFrom(config) - actorOf(new SupervisorActor(handler, trapExits)).start - } - - def apply(handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]): ActorRef = - actorOf(new SupervisorActor(handler, trapExceptions)).start -} - /** * Use this factory instead of the Supervisor factory object if you want to control * instantiation and starting of the Supervisor, if not then it is easier and better @@ -166,7 +126,7 @@ sealed class Supervisor private[akka] ( private val childActors = new ConcurrentHashMap[String, List[ActorRef]] private val childSupervisors = new CopyOnWriteArrayList[Supervisor] - private[akka] val supervisor = SupervisorActor(handler, trapExceptions) + private[akka] val supervisor = actorOf(new SupervisorActor(handler, trapExceptions)).start def uuid = supervisor.uuid @@ -217,19 +177,7 @@ sealed class Supervisor private[akka] ( } /** - * Use this class when you want to create a supervisor dynamically that should only - * manage its child children and not have any functionality by itself. - *

- * Here is a sample on how to use it: - *

- * val supervisor = Supervisor(AllForOneStrategy(maxNrOfRetries, timeRange), Array(classOf[Throwable]))
- * 
- * - * You dynamically link and unlink child children using the 'link' and 'unlink' methods. - *
- * supervisor.link(child)
- * supervisor.unlink(child)
- * 
+ * For internal use only. * * @author Jonas Bonér */ diff --git a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala index 72e94526ce..832ae9203a 100644 --- a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala @@ -7,19 +7,19 @@ package se.scalablesolutions.akka.dispatch import java.util.{LinkedList, Queue, List} import java.util.HashMap -import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor, ActorRef} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher { @volatile protected var active: Boolean = false protected val queue = new ReactiveMessageQueue(name) - protected val messageInvokers = new HashMap[AnyRef, MessageInvoker] + protected val messageInvokers = new HashMap[ActorRef, ActorRef] protected var selectorThread: Thread = _ protected val guard = new Object def dispatch(invocation: MessageInvocation) = queue.append(invocation) override def register(actorRef: ActorRef) = synchronized { - messageInvokers.put(actorRef, new ActorMessageInvoker(actorRef)) + messageInvokers.put(actorRef, actorRef) super.register(actorRef) } diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala index 461cdef5ff..95e77625ec 100644 --- a/akka-core/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala @@ -40,7 +40,7 @@ import se.scalablesolutions.akka.config.Config.config * @author Jonas Bonér */ object Dispatchers { - val THROUGHPUT = config.getInt("akka.dispatch.throughput", 5) + val THROUGHPUT = config.getInt("akka.dispatcher.throughput", 5) object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") { override def register(actor: ActorRef) = { diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala index 8e094fe108..fbf52224ce 100644 --- a/akka-core/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala @@ -13,6 +13,9 @@ import java.util.concurrent.ConcurrentHashMap import org.multiverse.commitbarriers.CountDownCommitBarrier +/** + * @author Jonas Bonér + */ final class MessageInvocation(val receiver: ActorRef, val message: Any, val sender: Option[ActorRef], @@ -49,14 +52,16 @@ final class MessageInvocation(val receiver: ActorRef, } } +/** + * @author Jonas Bonér + */ trait MessageQueue { def append(handle: MessageInvocation) } -trait MessageInvoker { - def invoke(message: MessageInvocation) -} - +/** + * @author Jonas Bonér + */ trait MessageDispatcher extends Logging { protected val references = new ConcurrentHashMap[String, ActorRef] def dispatch(invocation: MessageInvocation) @@ -65,14 +70,16 @@ trait MessageDispatcher extends Logging { def register(actorRef: ActorRef) = references.put(actorRef.uuid, actorRef) def unregister(actorRef: ActorRef) = { references.remove(actorRef.uuid) - if (canBeShutDown) - shutdown // shut down in the dispatcher's references is zero + if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero } def canBeShutDown: Boolean = references.isEmpty def isShutdown: Boolean def usesActorMailbox : Boolean } +/** + * @author Jonas Bonér + */ trait MessageDemultiplexer { def select def wakeUp diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala index f5d8c034c1..5c1cb78a52 100644 --- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -7,17 +7,16 @@ package se.scalablesolutions.akka.dispatch import java.util.concurrent.LinkedBlockingQueue import java.util.Queue -import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorMessageInvoker} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * * @author Jonas Bonér */ -class ThreadBasedDispatcher(actor: ActorRef) extends MessageDispatcher { +class ThreadBasedDispatcher(private val actor: ActorRef) extends MessageDispatcher { private val name = actor.getClass.getName + ":" + actor.uuid private val threadName = "thread-based:dispatcher:" + name - private val messageHandler = new ActorMessageInvoker(actor) private val queue = new BlockingMessageQueue(name) private var selectorThread: Thread = _ @volatile private var active: Boolean = false @@ -30,7 +29,7 @@ class ThreadBasedDispatcher(actor: ActorRef) extends MessageDispatcher { override def run = { while (active) { try { - messageHandler.invoke(queue.take) + actor.invoke(queue.take) } catch { case e: InterruptedException => active = false } } } diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index 66f7b59baa..32380bec7d 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -261,11 +261,11 @@ object Cluster extends Cluster with Logging { sup <- createSupervisor(actorRef) } { val serializer = Class.forName(config.getString( - "akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) - .newInstance.asInstanceOf[Serializer] + "akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) + .newInstance.asInstanceOf[Serializer] - classLoader = serializerClassLoader orElse classLoader - serializer.classLoader = classLoader + classLoader = serializerClassLoader orElse classLoader + serializer.classLoader = classLoader actorRef.start sup.start actorRef ! InitClusterActor(serializer) diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index f93c5dc345..15ed0409ed 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -82,19 +82,19 @@ object RemoteClient extends Logging { private[akka] def actorFor(uuid: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef = RemoteActorRef(uuid, className, hostname, port, timeout, loader) - def clientFor(hostname: String, port: Int): RemoteClient = + def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port), None) - def clientFor(hostname: String, port: Int, loader: ClassLoader): RemoteClient = + def clientFor(hostname: String, port: Int, loader: ClassLoader): RemoteClient = clientFor(new InetSocketAddress(hostname, port), Some(loader)) - def clientFor(address: InetSocketAddress): RemoteClient = + def clientFor(address: InetSocketAddress): RemoteClient = clientFor(address, None) - def clientFor(address: InetSocketAddress, loader: ClassLoader): RemoteClient = + def clientFor(address: InetSocketAddress, loader: ClassLoader): RemoteClient = clientFor(address, Some(loader)) - private[akka] def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient = + private[akka] def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient = clientFor(new InetSocketAddress(hostname, port), loader) private[akka] def clientFor(address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized { @@ -330,7 +330,7 @@ class RemoteClientHandler(val name: String, client.connection = bootstrap.connect(remoteAddress) client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. if (!client.connection.isSuccess) { - client.listeners.toArray.foreach(l => + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause)) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } @@ -339,13 +339,13 @@ class RemoteClientHandler(val name: String, } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.listeners.toArray.foreach(l => + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientConnected(client.hostname, client.port)) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.listeners.toArray.foreach(l => + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port)) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 782775e2a3..8f6cd40684 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -184,7 +184,7 @@ class RemoteServer extends Logging { def start(_hostname: String, _port: Int): RemoteServer = start(_hostname, _port, None) - private def start(_hostname: String, _port: Int, loader: ClassLoader): RemoteServer = + private def start(_hostname: String, _port: Int, loader: ClassLoader): RemoteServer = start(_hostname, _port, Some(loader)) private def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized { @@ -366,7 +366,7 @@ class RemoteServerHandler( val message = RemoteProtocolBuilder.getMessage(request) if (request.hasSender) { val sender = request.getSender - if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtocol(sender, applicationLoader))) + if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtobuf(sender, applicationLoader))) } else { try { val resultOrNone = actorRef !! message diff --git a/akka-core/src/main/scala/routing/Listeners.scala b/akka-core/src/main/scala/routing/Listeners.scala index 4bfc3f30b6..7563e0800b 100644 --- a/akka-core/src/main/scala/routing/Listeners.scala +++ b/akka-core/src/main/scala/routing/Listeners.scala @@ -13,7 +13,7 @@ case class Listen(listener: ActorRef) extends ListenerMessage case class Deafen(listener: ActorRef) extends ListenerMessage case class WithListeners(f: List[ActorRef] => Unit) extends ListenerMessage -/** +/** * Listeners is a generic trait to implement listening capability on an Actor. *

* Use the gossip(msg) method to have it sent to the listeners. @@ -34,6 +34,6 @@ trait Listeners { self: Actor => } protected def gossip(msg: Any) = listenersAsList foreach (_ ! msg) - + private def listenersAsList: List[ActorRef] = listeners.toArray.toList.asInstanceOf[List[ActorRef]] } diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index dfd6c53fdf..439093dec6 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -282,10 +282,10 @@ object Transaction { setTransaction(Some(tx)) mtx.registerLifecycleListener(new TransactionLifecycleListener() { def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event.name match { - case "postCommit" => + case "postCommit" => log.trace("Committing transaction [%s]", mtx) tx.commit - case "postAbort" => + case "postAbort" => log.trace("Aborting transaction [%s]", mtx) tx.abort case _ => {} @@ -309,7 +309,7 @@ object Transaction { val id = Transaction.idFactory.incrementAndGet @volatile private[this] var status: TransactionStatus = TransactionStatus.New private[akka] var transaction: Option[MultiverseTransaction] = None - private[this] val persistentStateMap = new HashMap[String, Committable] + private[this] val persistentStateMap = new HashMap[String, Committable with Abortable] private[akka] val depth = new AtomicInteger(0) val jta: Option[TransactionContainer] = @@ -329,9 +329,7 @@ object Transaction { def commit = synchronized { log.trace("Committing transaction %s", toString) - Transaction.atomic0 { - persistentStateMap.valuesIterator.foreach(_.commit) - } + persistentStateMap.valuesIterator.foreach(_.commit) status = TransactionStatus.Completed jta.foreach(_.commit) } @@ -339,6 +337,8 @@ object Transaction { def abort = synchronized { log.trace("Aborting transaction %s", toString) jta.foreach(_.rollback) + persistentStateMap.valuesIterator.foreach(_.abort) + persistentStateMap.clear } def isNew = synchronized { status == TransactionStatus.New } @@ -361,7 +361,7 @@ object Transaction { private[akka] def isTopLevel = depth.get == 0 - private[akka] def register(uuid: String, storage: Committable) = persistentStateMap.put(uuid, storage) + private[akka] def register(uuid: String, storage: Committable with Abortable) = persistentStateMap.put(uuid, storage) private def ensureIsActive = if (status != TransactionStatus.Active) throw new StmConfigurationException( diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala index e3e3f4ac7f..8f449db1b1 100644 --- a/akka-core/src/main/scala/stm/TransactionalState.scala +++ b/akka-core/src/main/scala/stm/TransactionalState.scala @@ -56,6 +56,13 @@ trait Committable { def commit: Unit } +/** + * @author Jonas Bonér + */ +trait Abortable { + def abort: Unit +} + object RefFactory { private val factory = getGlobalStmInstance.getProgrammaticReferenceFactoryBuilder.build diff --git a/akka-core/src/test/scala/ActorPatternsTest.scala b/akka-core/src/test/scala/ActorPatternsTest.scala index fac59a163c..0d4e9b6b08 100644 --- a/akka-core/src/test/scala/ActorPatternsTest.scala +++ b/akka-core/src/test/scala/ActorPatternsTest.scala @@ -21,14 +21,14 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4") val targetOk = new AtomicInteger(0) val t1 = actorOf( new Actor() { - def receive = { + def receive = { case `testMsg1` => self.reply(3) case `testMsg2` => self.reply(7) } } ).start val t2 = actorOf( new Actor() { - def receive = { + def receive = { case `testMsg3` => self.reply(11) } }).start @@ -43,9 +43,9 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat b <- (d.!![Int](testMsg2,5000)) c <- (d.!![Int](testMsg3,5000)) } yield a + b + c - + result.get must be(21) - for(a <- List(t1,t2,d)) a.stop + for(a <- List(t1,t2,d)) a.stop } @Test def testLogger = { diff --git a/akka-core/src/test/scala/StmSpec.scala b/akka-core/src/test/scala/StmSpec.scala index 401545d50c..17d4be32bd 100644 --- a/akka-core/src/test/scala/StmSpec.scala +++ b/akka-core/src/test/scala/StmSpec.scala @@ -95,16 +95,42 @@ class StmSpec extends val size2: Int = (actor !! Size).getOrElse(fail("Could not get Vector::size")) size2 should equal(3) } catch { - case e => + case e => e.printStackTrace fail(e.toString) } } } + + describe("Transactor") { + it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multipse times in a row") { + import GlobalTransactionVectorTestActor._ + try { + val actor = actorOf[NestedTransactorLevelOneActor].start + actor !! Add(2) + val size1: Int = (actor !! Size).getOrElse(fail("Could not get size")) + size1 should equal(2) + actor !! Add(7) + actor ! "HiLevelOne" + val size2: Int = (actor !! Size).getOrElse(fail("Could not get size")) + size2 should equal(7) + actor !! Add(0) + actor ! "HiLevelTwo" + val size3: Int = (actor !! Size).getOrElse(fail("Could not get size")) + size3 should equal(0) + actor !! Add(3) + val size4: Int = (actor !! Size).getOrElse(fail("Could not get size")) + size4 should equal(3) + } catch { + case e => + fail(e.toString) + } + } + } /* describe("Multiverse API") { it("should blablabla") { - + import org.multiverse.api.programmatic._ // import org.multiverse.api._ import org.multiverse.templates._ @@ -113,13 +139,13 @@ class StmSpec extends import org.multiverse.api.{GlobalStmInstance, ThreadLocalTransaction, Transaction => MultiverseTransaction} import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent} import org.multiverse.commitbarriers._ - + def createRef[T]: ProgrammaticReference[T] = GlobalStmInstance .getGlobalStmInstance .getProgrammaticReferenceFactoryBuilder .build .atomicCreateReference(null.asInstanceOf[T]) - + val ref1 = Ref(0)//createRef[Int] val ref2 = Ref(0)//createRef[Int] @@ -158,15 +184,47 @@ class GlobalTransactionVectorTestActor extends Actor { import GlobalTransactionVectorTestActor._ import se.scalablesolutions.akka.stm.Transaction.Global - private var vector: TransactionalVector[Int] = Global.atomic { TransactionalVector(1) } - + private val vector: TransactionalVector[Int] = Global.atomic { TransactionalVector(1) } + def receive = { - case Add(value) => + case Add(value) => Global.atomic { vector + value} self.reply(Success) - case Size => + case Size => val size = Global.atomic { vector.size } self.reply(size) } } + +class NestedTransactorLevelOneActor extends Actor { + import GlobalTransactionVectorTestActor._ + private val nested = actorOf[NestedTransactorLevelTwoActor].start + + def receive = { + case add @ Add(_) => + self.reply((nested !! add).get) + + case Size => + self.reply((nested !! Size).get) + + case "HiLevelOne" => println("HiLevelOne") + case "HiLevelTwo" => nested ! "HiLevelTwo" + } +} + +class NestedTransactorLevelTwoActor extends Actor { + import GlobalTransactionVectorTestActor._ + private val ref = Ref(0) + + def receive = { + case Add(value) => + ref.swap(value) + self.reply(Success) + + case Size => + self.reply(ref.getOrElse(-1)) + + case "HiLevelTwo" => println("HiLevelTwo") + } +} diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/JerseyFoo.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/JerseyFoo.java deleted file mode 100644 index 6828ba421f..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/JerseyFoo.java +++ /dev/null @@ -1,14 +0,0 @@ -package se.scalablesolutions.akka.api; - -import javax.ws.rs.Path; -import javax.ws.rs.GET; -import javax.ws.rs.Produces; - -@Path("/foo") -public class JerseyFoo { - @GET - @Produces({"application/json"}) - public String foo() { - return "hello foo"; - } -} \ No newline at end of file diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java deleted file mode 100644 index 080c1cbd0b..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java +++ /dev/null @@ -1,11 +0,0 @@ -package se.scalablesolutions.akka.api; - -public class PersistenceManager { - private static volatile boolean isRunning = false; - public static void init() { - if (!isRunning) { - se.scalablesolutions.akka.kernel.Kernel$.MODULE$.startRemoteService(); - isRunning = true; - } - } -} diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java deleted file mode 100644 index d5c1bdf00c..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java +++ /dev/null @@ -1,26 +0,0 @@ -package se.scalablesolutions.akka.api; - -import se.scalablesolutions.akka.persistence.common.*; -import se.scalablesolutions.akka.persistence.cassandra.*; -import se.scalablesolutions.akka.actor.annotation.inittransactionalstate; - -public class PersistentClasher { - private PersistentMap state; - - @inittransactionalstate - public void init() { - state = CassandraStorage.newMap(); - } - - public String getState(String key) { - return (String)state.get(key).get(); - } - - public void setState(String key, String msg) { - state.put(key, msg); - } - - public void clash() { - state.put("clasher", "was here"); - } -} \ No newline at end of file diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentFailer.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentFailer.java deleted file mode 100644 index 44cab7d6c3..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentFailer.java +++ /dev/null @@ -1,7 +0,0 @@ -package se.scalablesolutions.akka.api; - -public class PersistentFailer implements java.io.Serializable { - public int fail() { - throw new RuntimeException("expected"); - } -} diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java deleted file mode 100644 index 796d3d913a..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.api; - -import se.scalablesolutions.akka.config.*; -import se.scalablesolutions.akka.config.ActiveObjectConfigurator; -import static se.scalablesolutions.akka.config.JavaConfig.*; -import se.scalablesolutions.akka.actor.*; -import se.scalablesolutions.akka.kernel.Kernel; - -import junit.framework.TestCase; - -public class PersistentNestedStateTest extends TestCase { - static String messageLog = ""; - - final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator(); - - protected void setUp() { - PersistenceManager.init(); - conf.configure( - new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}), - new Component[]{ - new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 10000000), - new Component(PersistentStatefulNested.class, new LifeCycle(new Permanent()), 10000000), - new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000) - //new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent()), 100000) - }).inject().supervise(); - } - - protected void tearDown() { - conf.stop(); - } - - public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() throws Exception { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class); - stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state - nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional - assertEquals("new state", nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); - assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); - } - - public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state - PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class); - nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state - PersistentFailer failer = conf.getInstance(PersistentFailer.class); - try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state - assertEquals("init", nested.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state - } - - public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - stateful.setVectorState("init"); // set init state - PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class); - nested.setVectorState("init"); // set init state - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional - assertEquals(2, stateful.getVectorLength()); // BAD: keeps one element since last test - assertEquals(2, nested.getVectorLength()); - } - - public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - stateful.setVectorState("init"); // set init state - PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class); - nested.setVectorState("init"); // set init state - PersistentFailer failer = conf.getInstance(PersistentFailer.class); - try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals(1, stateful.getVectorLength()); - assertEquals(1, nested.getVectorLength()); - } - - public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class); - stateful.setRefState("init"); // set init state - nested.setRefState("init"); // set init state - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional - assertEquals("new state", stateful.getRefState()); - assertEquals("new state", nested.getRefState()); - } - - public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class); - stateful.setRefState("init"); // set init state - nested.setRefState("init"); // set init state - PersistentFailer failer = conf.getInstance(PersistentFailer.class); - try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals("init", stateful.getRefState()); // check that state is == init state - assertEquals("init", nested.getRefState()); // check that state is == init state - } -} diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java deleted file mode 100644 index 74f0afcf53..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.api; - -import se.scalablesolutions.akka.config.*; -import static se.scalablesolutions.akka.config.JavaConfig.*; - -import junit.framework.TestCase; - -public class PersistentStateTest extends TestCase { - static String messageLog = ""; - - final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator(); - - protected void setUp() { - PersistenceManager.init(); - conf.configure( - new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}), - new Component[] { - new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 10000000), - new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000) - //new Component(PersistentClasher.class, new LifeCycle(new Permanent()), 100000) - }).supervise(); - } - - protected void tearDown() { - conf.stop(); - } - - public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired - assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); - } - - public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state - PersistentFailer failer = conf.getInstance(PersistentFailer.class); - try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state - } - - public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - stateful.setVectorState("init"); // set init state - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired - assertEquals("init", stateful.getVectorState(0)); - assertEquals("new state", stateful.getVectorState(1)); - } - - public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - stateful.setVectorState("init"); // set init state - PersistentFailer failer = conf.getInstance(PersistentFailer.class); - try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals("init", stateful.getVectorState(0)); // check that state is == init state - } - - public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - stateful.setRefState("init"); // set init state - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired - assertEquals("new state", stateful.getRefState()); - } - - public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - stateful.setRefState("init"); // set init state - PersistentFailer failer = conf.getInstance(PersistentFailer.class); - try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals("init", stateful.getRefState()); // check that state is == init state - } -} diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java deleted file mode 100644 index 6a8d3353b7..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java +++ /dev/null @@ -1,84 +0,0 @@ -package se.scalablesolutions.akka.api; - -import se.scalablesolutions.akka.actor.annotation.inittransactionalstate; -import se.scalablesolutions.akka.actor.annotation.transactionrequired; -import se.scalablesolutions.akka.persistence.common.*; -import se.scalablesolutions.akka.persistence.cassandra.*; - -@transactionrequired -public class PersistentStateful { - private PersistentMap mapState; - private PersistentVector vectorState; - private PersistentRef refState; - - @inittransactionalstate - public void init() { - mapState = CassandraStorage.newMap(); - vectorState = CassandraStorage.newVector(); - refState = CassandraStorage.newRef(); - } - - public String getMapState(String key) { - byte[] bytes = (byte[]) mapState.get(key.getBytes()).get(); - return new String(bytes, 0, bytes.length); - } - - public String getVectorState(int index) { - byte[] bytes = (byte[]) vectorState.get(index); - return new String(bytes, 0, bytes.length); - } - - public int getVectorLength() { - return vectorState.length(); - } - - public String getRefState() { - if (refState.isDefined()) { - byte[] bytes = (byte[]) refState.get().get(); - return new String(bytes, 0, bytes.length); - } else throw new IllegalStateException("No such element"); - } - - public void setMapState(String key, String msg) { - mapState.put(key.getBytes(), msg.getBytes()); - } - - public void setVectorState(String msg) { - vectorState.add(msg.getBytes()); - } - - public void setRefState(String msg) { - refState.swap(msg.getBytes()); - } - - public void success(String key, String msg) { - mapState.put(key.getBytes(), msg.getBytes()); - vectorState.add(msg.getBytes()); - refState.swap(msg.getBytes()); - } - - public String failure(String key, String msg, PersistentFailer failer) { - mapState.put(key.getBytes(), msg.getBytes()); - vectorState.add(msg.getBytes()); - refState.swap(msg.getBytes()); - failer.fail(); - return msg; - } - - public String success(String key, String msg, PersistentStatefulNested nested) { - mapState.put(key.getBytes(), msg.getBytes()); - vectorState.add(msg.getBytes()); - refState.swap(msg.getBytes()); - nested.success(key, msg); - return msg; - } - - public String failure(String key, String msg, PersistentStatefulNested nested, PersistentFailer failer) { - mapState.put(key.getBytes(), msg.getBytes()); - vectorState.add(msg.getBytes()); - refState.swap(msg.getBytes()); - nested.failure(key, msg, failer); - return msg; - } -} - diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java deleted file mode 100644 index bd931ef108..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java +++ /dev/null @@ -1,70 +0,0 @@ -package se.scalablesolutions.akka.api; - -import se.scalablesolutions.akka.actor.annotation.inittransactionalstate; -import se.scalablesolutions.akka.actor.annotation.transactionrequired; -import se.scalablesolutions.akka.persistence.common.*; -import se.scalablesolutions.akka.persistence.cassandra.*; - -@transactionrequired -public class PersistentStatefulNested { - private PersistentMap mapState; - private PersistentVector vectorState; - private PersistentRef refState; - - @inittransactionalstate - public void init() { - mapState = CassandraStorage.newMap(); - vectorState = CassandraStorage.newVector(); - refState = CassandraStorage.newRef(); - } - - public String getMapState(String key) { - byte[] bytes = (byte[]) mapState.get(key.getBytes()).get(); - return new String(bytes, 0, bytes.length); - } - - - public String getVectorState(int index) { - byte[] bytes = (byte[]) vectorState.get(index); - return new String(bytes, 0, bytes.length); - } - - public int getVectorLength() { - return vectorState.length(); - } - - public String getRefState() { - if (refState.isDefined()) { - byte[] bytes = (byte[]) refState.get().get(); - return new String(bytes, 0, bytes.length); - } else throw new IllegalStateException("No such element"); - } - - public void setMapState(String key, String msg) { - mapState.put(key.getBytes(), msg.getBytes()); - } - - public void setVectorState(String msg) { - vectorState.add(msg.getBytes()); - } - - public void setRefState(String msg) { - refState.swap(msg.getBytes()); - } - - public String success(String key, String msg) { - mapState.put(key.getBytes(), msg.getBytes()); - vectorState.add(msg.getBytes()); - refState.swap(msg.getBytes()); - return msg; - } - - public String failure(String key, String msg, PersistentFailer failer) { - mapState.put(key.getBytes(), msg.getBytes()); - vectorState.add(msg.getBytes()); - refState.swap(msg.getBytes()); - failer.fail(); - return msg; - } -} - diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufProtocol.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufProtocol.java deleted file mode 100644 index e7b15ca1ef..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufProtocol.java +++ /dev/null @@ -1,403 +0,0 @@ -// Generated by the protocol buffer compiler. DO NOT EDIT! - -package se.scalablesolutions.akka.api; - -public final class ProtobufProtocol { - private ProtobufProtocol() {} - public static void registerAllExtensions( - com.google.protobuf.ExtensionRegistry registry) { - } - public static final class ProtobufPOJO extends - com.google.protobuf.GeneratedMessage { - // Use ProtobufPOJO.newBuilder() to construct. - private ProtobufPOJO() {} - - private static final ProtobufPOJO defaultInstance = new ProtobufPOJO(); - public static ProtobufPOJO getDefaultInstance() { - return defaultInstance; - } - - public ProtobufPOJO getDefaultInstanceForType() { - return defaultInstance; - } - - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return se.scalablesolutions.akka.api.ProtobufProtocol.internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return se.scalablesolutions.akka.api.ProtobufProtocol.internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_fieldAccessorTable; - } - - // required uint64 id = 1; - public static final int ID_FIELD_NUMBER = 1; - private boolean hasId; - private long id_ = 0L; - public boolean hasId() { return hasId; } - public long getId() { return id_; } - - // required string name = 2; - public static final int NAME_FIELD_NUMBER = 2; - private boolean hasName; - private java.lang.String name_ = ""; - public boolean hasName() { return hasName; } - public java.lang.String getName() { return name_; } - - // required bool status = 3; - public static final int STATUS_FIELD_NUMBER = 3; - private boolean hasStatus; - private boolean status_ = false; - public boolean hasStatus() { return hasStatus; } - public boolean getStatus() { return status_; } - - public final boolean isInitialized() { - if (!hasId) return false; - if (!hasName) return false; - if (!hasStatus) return false; - return true; - } - - public void writeTo(com.google.protobuf.CodedOutputStream output) - throws java.io.IOException { - if (hasId()) { - output.writeUInt64(1, getId()); - } - if (hasName()) { - output.writeString(2, getName()); - } - if (hasStatus()) { - output.writeBool(3, getStatus()); - } - getUnknownFields().writeTo(output); - } - - private int memoizedSerializedSize = -1; - public int getSerializedSize() { - int size = memoizedSerializedSize; - if (size != -1) return size; - - size = 0; - if (hasId()) { - size += com.google.protobuf.CodedOutputStream - .computeUInt64Size(1, getId()); - } - if (hasName()) { - size += com.google.protobuf.CodedOutputStream - .computeStringSize(2, getName()); - } - if (hasStatus()) { - size += com.google.protobuf.CodedOutputStream - .computeBoolSize(3, getStatus()); - } - size += getUnknownFields().getSerializedSize(); - memoizedSerializedSize = size; - return size; - } - - public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom( - com.google.protobuf.ByteString data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom( - com.google.protobuf.ByteString data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(byte[] data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom( - byte[] data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(java.io.InputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(java.io.InputStream input) - throws java.io.IOException { - return newBuilder().mergeDelimitedFrom(input).buildParsed(); - } - public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeDelimitedFrom(input, extensionRegistry) - .buildParsed(); - } - public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom( - com.google.protobuf.CodedInputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - - public static Builder newBuilder() { return Builder.create(); } - public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO prototype) { - return newBuilder().mergeFrom(prototype); - } - public Builder toBuilder() { return newBuilder(this); } - - public static final class Builder extends - com.google.protobuf.GeneratedMessage.Builder { - private se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO result; - - // Construct using se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.newBuilder() - private Builder() {} - - private static Builder create() { - Builder builder = new Builder(); - builder.result = new se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO(); - return builder; - } - - protected se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO internalGetResult() { - return result; - } - - public Builder clear() { - if (result == null) { - throw new IllegalStateException( - "Cannot call clear() after build()."); - } - result = new se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO(); - return this; - } - - public Builder clone() { - return create().mergeFrom(result); - } - - public com.google.protobuf.Descriptors.Descriptor - getDescriptorForType() { - return se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.getDescriptor(); - } - - public se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO getDefaultInstanceForType() { - return se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.getDefaultInstance(); - } - - public boolean isInitialized() { - return result.isInitialized(); - } - public se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO build() { - if (result != null && !isInitialized()) { - throw newUninitializedMessageException(result); - } - return buildPartial(); - } - - private se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO buildParsed() - throws com.google.protobuf.InvalidProtocolBufferException { - if (!isInitialized()) { - throw newUninitializedMessageException( - result).asInvalidProtocolBufferException(); - } - return buildPartial(); - } - - public se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO buildPartial() { - if (result == null) { - throw new IllegalStateException( - "build() has already been called on this Builder."); - } - se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO returnMe = result; - result = null; - return returnMe; - } - - public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO) { - return mergeFrom((se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO)other); - } else { - super.mergeFrom(other); - return this; - } - } - - public Builder mergeFrom(se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO other) { - if (other == se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.getDefaultInstance()) return this; - if (other.hasId()) { - setId(other.getId()); - } - if (other.hasName()) { - setName(other.getName()); - } - if (other.hasStatus()) { - setStatus(other.getStatus()); - } - this.mergeUnknownFields(other.getUnknownFields()); - return this; - } - - public Builder mergeFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - com.google.protobuf.UnknownFieldSet.Builder unknownFields = - com.google.protobuf.UnknownFieldSet.newBuilder( - this.getUnknownFields()); - while (true) { - int tag = input.readTag(); - switch (tag) { - case 0: - this.setUnknownFields(unknownFields.build()); - return this; - default: { - if (!parseUnknownField(input, unknownFields, - extensionRegistry, tag)) { - this.setUnknownFields(unknownFields.build()); - return this; - } - break; - } - case 8: { - setId(input.readUInt64()); - break; - } - case 18: { - setName(input.readString()); - break; - } - case 24: { - setStatus(input.readBool()); - break; - } - } - } - } - - - // required uint64 id = 1; - public boolean hasId() { - return result.hasId(); - } - public long getId() { - return result.getId(); - } - public Builder setId(long value) { - result.hasId = true; - result.id_ = value; - return this; - } - public Builder clearId() { - result.hasId = false; - result.id_ = 0L; - return this; - } - - // required string name = 2; - public boolean hasName() { - return result.hasName(); - } - public java.lang.String getName() { - return result.getName(); - } - public Builder setName(java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - result.hasName = true; - result.name_ = value; - return this; - } - public Builder clearName() { - result.hasName = false; - result.name_ = getDefaultInstance().getName(); - return this; - } - - // required bool status = 3; - public boolean hasStatus() { - return result.hasStatus(); - } - public boolean getStatus() { - return result.getStatus(); - } - public Builder setStatus(boolean value) { - result.hasStatus = true; - result.status_ = value; - return this; - } - public Builder clearStatus() { - result.hasStatus = false; - result.status_ = false; - return this; - } - } - - static { - se.scalablesolutions.akka.api.ProtobufProtocol.getDescriptor(); - } - - static { - se.scalablesolutions.akka.api.ProtobufProtocol.internalForceInit(); - } - } - - private static com.google.protobuf.Descriptors.Descriptor - internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor; - private static - com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_fieldAccessorTable; - - public static com.google.protobuf.Descriptors.FileDescriptor - getDescriptor() { - return descriptor; - } - private static com.google.protobuf.Descriptors.FileDescriptor - descriptor; - static { - java.lang.String[] descriptorData = { - "\n4se/scalablesolutions/akka/api/Protobuf" + - "Protocol.proto\022\035se.scalablesolutions.akk" + - "a.api\"8\n\014ProtobufPOJO\022\n\n\002id\030\001 \002(\004\022\014\n\004nam" + - "e\030\002 \002(\t\022\016\n\006status\030\003 \002(\010" - }; - com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = - new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { - public com.google.protobuf.ExtensionRegistry assignDescriptors( - com.google.protobuf.Descriptors.FileDescriptor root) { - descriptor = root; - internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor = - getDescriptor().getMessageTypes().get(0); - internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_fieldAccessorTable = new - com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor, - new java.lang.String[] { "Id", "Name", "Status", }, - se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.class, - se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.Builder.class); - return null; - } - }; - com.google.protobuf.Descriptors.FileDescriptor - .internalBuildGeneratedFileFrom(descriptorData, - new com.google.protobuf.Descriptors.FileDescriptor[] { - }, assigner); - } - - public static void internalForceInit() {} -} diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufProtocol.proto b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufProtocol.proto deleted file mode 100644 index 8140ad313a..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufProtocol.proto +++ /dev/null @@ -1,17 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.api; - -/* - Compile with: - cd ./akka-fun-test-java/src/test/java - protoc se/scalablesolutions/akka/api/ProtobufProtocol.proto --java_out . -*/ - -message ProtobufPOJO { - required uint64 id = 1; - required string name = 2; - required bool status = 3; -} diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufSerializationTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufSerializationTest.java deleted file mode 100644 index a67560146d..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ProtobufSerializationTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.api; - -import junit.framework.TestCase; -import se.scalablesolutions.akka.serialization.SerializerFactory; - -public class ProtobufSerializationTest extends TestCase { - public void testOutIn() throws Exception { - SerializerFactory factory = new SerializerFactory(); - ProtobufProtocol.ProtobufPOJO pojo1 = ProtobufProtocol.ProtobufPOJO.getDefaultInstance().toBuilder().setId(1).setName("protobuf").setStatus(true).build(); - - byte[] bytes = factory.getProtobuf().out(pojo1); - Object obj = factory.getProtobuf().in(bytes, pojo1.getClass()); - - assertTrue(obj instanceof ProtobufProtocol.ProtobufPOJO); - ProtobufProtocol.ProtobufPOJO pojo2 = (ProtobufProtocol.ProtobufPOJO)obj; - assertEquals(pojo1.getId(), pojo2.getId()); - assertEquals(pojo1.getName(), pojo2.getName()); - assertEquals(pojo1.getStatus(), pojo2.getStatus()); - } - - public void testDeepClone() throws Exception { - SerializerFactory factory = new SerializerFactory(); - ProtobufProtocol.ProtobufPOJO pojo1 = ProtobufProtocol.ProtobufPOJO.getDefaultInstance().toBuilder().setId(1).setName("protobuf").setStatus(true).build(); - - Object obj = factory.getProtobuf().deepClone(pojo1); - - assertTrue(obj instanceof ProtobufProtocol.ProtobufPOJO); - ProtobufProtocol.ProtobufPOJO pojo2 = (ProtobufProtocol.ProtobufPOJO)obj; - assertEquals(pojo1.getId(), pojo2.getId()); - assertEquals(pojo1.getName(), pojo2.getName()); - assertEquals(pojo1.getStatus(), pojo2.getStatus()); - } -} - diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java deleted file mode 100644 index 18c3b1ed62..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.api; - -import se.scalablesolutions.akka.config.*; -import static se.scalablesolutions.akka.config.JavaConfig.*; - -import junit.framework.TestCase; - -public class RemotePersistentStateTest extends TestCase { - static String messageLog = ""; - - final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator(); - - protected void setUp() { - PersistenceManager.init(); - conf.configure( - new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}), - new Component[] { - new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 1000000, new RemoteAddress("localhost", 9999)), - new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000000, new RemoteAddress("localhost", 9999)) - }).supervise(); - } - - protected void tearDown() { - conf.stop(); - } - - public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired - assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); - } - - public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state - PersistentFailer failer = conf.getInstance(PersistentFailer.class); - try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "MapShouldRollBack", failer); // call failing transactionrequired method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state - } - - public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - int init = stateful.getVectorLength(); - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "VectorShouldNotRollback"); // transactionrequired - assertEquals(init + 1, stateful.getVectorLength()); - } - - public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - int init = stateful.getVectorLength(); - PersistentFailer failer = conf.getInstance(PersistentFailer.class); - try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals(init, stateful.getVectorLength()); - } - - public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - stateful.setRefState("init"); // set init state - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired - assertEquals("new state", stateful.getRefState()); - } - - public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() { - PersistentStateful stateful = conf.getInstance(PersistentStateful.class); - stateful.setRefState("init"); // set init state - PersistentFailer failer = conf.getInstance(PersistentFailer.class); - try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method - fail("should have thrown an exception"); - } catch (RuntimeException e) { - } // expected - assertEquals("init", stateful.getRefState()); // check that state is == init state - } - -} \ No newline at end of file diff --git a/akka-fun-test-java/testng.xml b/akka-fun-test-java/testng.xml deleted file mode 100644 index b894d3880b..0000000000 --- a/akka-fun-test-java/testng.xml +++ /dev/null @@ -1,16 +0,0 @@ - - - - - - - - - - - - - - - - diff --git a/akka-http/src/main/scala/AkkaClusterBroadcastFilter.scala b/akka-http/src/main/scala/AkkaClusterBroadcastFilter.scala index af5e83e71c..a4dffdea94 100644 --- a/akka-http/src/main/scala/AkkaClusterBroadcastFilter.scala +++ b/akka-http/src/main/scala/AkkaClusterBroadcastFilter.scala @@ -16,11 +16,11 @@ case class ClusterCometBroadcast(name: String, msg: AnyRef) extends ClusterComet * Enables explicit clustering of Atmosphere (Comet) resources * Annotate the endpoint which has the @Broadcast annotation with * @org.atmosphere.annotation.Cluster(Array(classOf[AkkClusterBroadcastFilter])){ name = "someUniqueName" } - * that's all folks! + * thats all folks! * Note: In the future, clustering comet will be transparent */ -class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRef] { +class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter { @BeanProperty var clusterName = "" @BeanProperty var broadcaster : Broadcaster = null diff --git a/akka-http/src/main/scala/AkkaCometServlet.scala b/akka-http/src/main/scala/AkkaCometServlet.scala index 01fe5272be..b020c473f6 100644 --- a/akka-http/src/main/scala/AkkaCometServlet.scala +++ b/akka-http/src/main/scala/AkkaCometServlet.scala @@ -20,8 +20,8 @@ class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProce private val handler = new AbstractReflectorAtmosphereHandler { override def onRequest(event: AtmosphereResource[HttpServletRequest, HttpServletResponse]) { if (event ne null) { - event.getRequest.setAttribute(ReflectorServletProcessor.ATMOSPHERE_RESOURCE, event) - event.getRequest.setAttribute(ReflectorServletProcessor.ATMOSPHERE_HANDLER, this) + event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, event) + event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_HANDLER, this) service(event.getRequest, event.getResponse) } } @@ -42,7 +42,10 @@ class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProce *

* Used by the Akka Kernel to bootstrap REST and Comet. */ -class AkkaServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging { +class AkkaServlet extends AtmosphereServlet with Logging { + addInitParameter(AtmosphereServlet.DISABLE_ONSTATE_EVENT,"true") + addInitParameter(AtmosphereServlet.BROADCASTER_CLASS,classOf[AkkaBroadcaster].getName) + lazy val servlet = createRestServlet protected def createRestServlet : AtmosphereRestServlet = new AtmosphereRestServlet { @@ -53,9 +56,9 @@ class AkkaServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging { * Instead we specify what semantics we want in code. */ override def loadConfiguration(sc: ServletConfig) { - config = new AtmosphereConfig { supportSession = false } - setDefaultBroadcasterClassName(classOf[AkkaBroadcaster].getName) - atmosphereHandlers.put("/*", new AtmosphereServlet.AtmosphereHandlerWrapper(servlet, new AkkaBroadcaster)) + config.setSupportSession(false) + isBroadcasterSpecified = true + addAtmosphereHandler("/*", servlet, new AkkaBroadcaster) } /** diff --git a/akka-http/src/main/scala/AkkaLoader.scala b/akka-http/src/main/scala/AkkaLoader.scala index 431758e2c7..886c2c0688 100644 --- a/akka-http/src/main/scala/AkkaLoader.scala +++ b/akka-http/src/main/scala/AkkaLoader.scala @@ -47,29 +47,29 @@ class AkkaLoader extends Logging { private def printBanner = { log.info( """ - t - t t t - t t tt t - tt t t tt t - t ttttttt t ttt t - t tt ttt t ttt t - t t ttt t ttt t t - tt t ttt ttt ttt t - t t ttt ttt t tt t - t ttt ttt t t - tt ttt ttt t - ttt ttt - tttttttt ttt ttt ttt ttt tttttttt - ttt tt ttt ttt ttt ttt ttt ttt - ttt ttt ttt ttt ttt ttt ttt ttt - ttt ttt ttt ttt ttt tt ttt ttt - tttt ttttttttt tttttttt tttt - ttttttttt ttt ttt ttt ttt ttttttttt - ttt ttt ttt ttt ttt ttt ttt ttt - ttt ttt ttt ttt ttt ttt ttt ttt - ttt tt ttt ttt ttt ttt ttt ttt + t + t t t + t t tt t + tt t t tt t + t ttttttt t ttt t + t tt ttt t ttt t + t t ttt t ttt t t + tt t ttt ttt ttt t + t t ttt ttt t tt t + t ttt ttt t t + tt ttt ttt t + ttt ttt tttttttt ttt ttt ttt ttt tttttttt - + ttt tt ttt ttt ttt ttt ttt ttt + ttt ttt ttt ttt ttt ttt ttt ttt + ttt ttt ttt ttt ttt tt ttt ttt + tttt ttttttttt tttttttt tttt + ttttttttt ttt ttt ttt ttt ttttttttt + ttt ttt ttt ttt ttt ttt ttt ttt + ttt ttt ttt ttt ttt ttt ttt ttt + ttt tt ttt ttt ttt ttt ttt ttt + tttttttt ttt ttt ttt ttt tttttttt + ================================================== """) log.info(" Running version %s", Config.VERSION) diff --git a/akka-kernel/src/main/scala/EmbeddedAppServer.scala b/akka-kernel/src/main/scala/EmbeddedAppServer.scala index 41e6dd44ae..8d9982c7e2 100644 --- a/akka-kernel/src/main/scala/EmbeddedAppServer.scala +++ b/akka-kernel/src/main/scala/EmbeddedAppServer.scala @@ -50,14 +50,14 @@ trait EmbeddedAppServer extends Bootable with Logging { Thread.currentThread.setContextClassLoader(applicationLoader.get) super.init(sc) } - finally { + finally { Thread.currentThread.setContextClassLoader(cl) } } }) adapter.setContextPath(uri.getPath) - adapter.addInitParameter("cometSupport", + adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") adapter.addInitParameter("com.sun.jersey.config.property.resourceConfigClass", "com.sun.jersey.api.core.PackagesResourceConfig") @@ -65,7 +65,7 @@ trait EmbeddedAppServer extends Bootable with Logging { config.getList("akka.rest.resource_packages").mkString(";") ) adapter.addInitParameter("com.sun.jersey.spi.container.ResourceFilters", - config.getList("akka.rest.filters").mkString(",") + config.getList("akka.rest.filters").mkString(",") ) if (HOME.isDefined) adapter.addRootFolder(HOME.get + "/deploy/root") diff --git a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala index b67a0e9256..f393f4a162 100644 --- a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala +++ b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorageBackend.scala @@ -42,7 +42,7 @@ private[akka] object CassandraStorageBackend extends case "ALL" => ConsistencyLevel.ALL case "ANY" => ConsistencyLevel.ANY case unknown => throw new IllegalArgumentException( - "Cassandra consistency level [" + unknown + "] is not supported." + + "Cassandra consistency level [" + unknown + "] is not supported." + "\n\tExpected one of [ZERO, ONE, QUORUM, DCQUORUM, DCQUORUMSYNC, ALL, ANY] in the akka.conf configuration file.") } } @@ -105,9 +105,9 @@ private[akka] object CassandraStorageBackend extends } } - def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = elements.foreach(insertVectorStorageEntryFor(name, _)) - + def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = { val columnPath = new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family) columnPath.setColumn(intToBytes(index)) diff --git a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala index 6746fca529..648dec2be3 100644 --- a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala @@ -171,4 +171,4 @@ object EmbeddedCassandraService { def start: Unit = {} } -*/ \ No newline at end of file +*/ diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index dcb1f4f85e..6a0eb9a8d8 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -30,7 +30,7 @@ class StorageException(message: String) extends RuntimeException(message) *

  * val myMap = CassandraStorage.getMap(id)
  * 
- * + * * Example Java usage: *
  * PersistentMap myMap = MongoStorage.newMap();
@@ -72,7 +72,7 @@ trait Storage {
 }
 
 /**
- * Implementation of PersistentMap for every concrete 
+ * Implementation of PersistentMap for every concrete
  * storage will have the same workflow. This abstracts the workflow.
  *
  * Subclasses just need to provide the actual concrete instance for the
@@ -81,7 +81,7 @@ trait Storage {
  * @author Jonas Bonér
  */
 trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
-  with Transactional with Committable with Logging {
+  with Transactional with Committable with Abortable with Logging {
   protected val newAndUpdatedEntries = TransactionalState.newMap[K, V]
   protected val removedEntries = TransactionalState.newVector[K]
   protected val shouldClearOnCommit = TransactionalRef[Boolean]()
@@ -97,6 +97,12 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
     removedEntries.clear
   }
 
+  def abort = {
+    newAndUpdatedEntries.clear
+    removedEntries.clear
+    shouldClearOnCommit.swap(false)
+  }
+
   def -=(key: K) = {
     remove(key)
     this
@@ -111,23 +117,23 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
     put(key, value)
     this
   }
-  
+
   override def put(key: K, value: V): Option[V] = {
     register
     newAndUpdatedEntries.put(key, value)
   }
- 
-  override def update(key: K, value: V) = { 
+
+  override def update(key: K, value: V) = {
     register
     newAndUpdatedEntries.update(key, value)
   }
-  
+
   override def remove(key: K) = {
     register
     removedEntries.add(key)
     newAndUpdatedEntries.get(key)
   }
-  
+
   def slice(start: Option[K], count: Int): List[Tuple2[K, V]] =
     slice(start, None, count)
 
@@ -135,11 +141,11 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
     storage.getMapStorageRangeFor(uuid, start, finish, count)
   } catch { case e: Exception => Nil }
 
-  override def clear = { 
+  override def clear = {
     register
     shouldClearOnCommit.swap(true)
   }
-  
+
   override def contains(key: K): Boolean = try {
     newAndUpdatedEntries.contains(key) ||
     storage.getMapStorageEntryFor(uuid, key).isDefined
@@ -157,9 +163,9 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
       storage.getMapStorageEntryFor(uuid, key)
     } catch { case e: Exception => None }
   }
-  
+
   def iterator = elements
-  
+
   override def elements: Iterator[Tuple2[K, V]]  = {
     new Iterator[Tuple2[K, V]] {
       private val originalList: List[Tuple2[K, V]] = try {
@@ -167,10 +173,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
       } catch {
         case e: Throwable => Nil
       }
-      private var elements = newAndUpdatedEntries.toList union originalList.reverse 
+      private var elements = newAndUpdatedEntries.toList union originalList.reverse
       override def next: Tuple2[K, V]= synchronized {
         val element = elements.head
-        elements = elements.tail        
+        elements = elements.tail
         element
       }
       override def hasNext: Boolean = synchronized { !elements.isEmpty }
@@ -188,7 +194,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
  *
  * @author Jonas Bonér
  */
-trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable {
+trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable with Abortable {
   protected val newElems = TransactionalState.newVector[T]
   protected val updatedElems = TransactionalState.newMap[Int, T]
   protected val removedElems = TransactionalState.newVector[T]
@@ -203,13 +209,20 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
     updatedElems.clear
   }
 
+  def abort = {
+    newElems.clear
+    updatedElems.clear
+    removedElems.clear
+    shouldClearOnCommit.swap(false)
+  }
+
   def +(elem: T) = add(elem)
-  
+
   def add(elem: T) = {
     register
     newElems + elem
   }
- 
+
   def apply(index: Int): T = get(index)
 
   def get(index: Int): T = {
@@ -218,7 +231,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
   }
 
   override def slice(start: Int, finish: Int): IndexedSeq[T] = slice(Some(start), Some(finish))
-  
+
   def slice(start: Option[Int], finish: Option[Int], count: Int = 0): IndexedSeq[T] = {
     val buffer = new scala.collection.mutable.ArrayBuffer[T]
     storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_))
@@ -262,21 +275,23 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
  *
  * @author Jonas Bonér
  */
-trait PersistentRef[T] extends Transactional with Committable {
+trait PersistentRef[T] extends Transactional with Committable with Abortable {
   protected val ref = new TransactionalRef[T]
-  
+
   val storage: RefStorageBackend[T]
 
   def commit = if (ref.isDefined) {
     storage.insertRefStorageFor(uuid, ref.get.get)
-    ref.swap(null.asInstanceOf[T]) 
+    ref.swap(null.asInstanceOf[T])
   }
 
+  def abort = ref.swap(null.asInstanceOf[T])
+
   def swap(elem: T) = {
     register
     ref.swap(elem)
   }
-  
+
   def get: Option[T] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid)
 
   def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined
@@ -294,7 +309,7 @@ trait PersistentRef[T] extends Transactional with Committable {
 }
 
 /**
- * Implementation of PersistentQueue for every concrete 
+ * Implementation of PersistentQueue for every concrete
  * storage will have the same workflow. This abstracts the workflow.
  * 

* Enqueue is simpler, we just have to record the operation in a local @@ -319,7 +334,7 @@ trait PersistentRef[T] extends Transactional with Committable { * @author Debasish Ghosh */ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] - with Transactional with Committable with Logging { + with Transactional with Committable with Abortable with Logging { sealed trait QueueOp case object ENQ extends QueueOp @@ -356,8 +371,17 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] enqueuedNDequeuedEntries.clear localQ.swap(Queue.empty) pickMeForDQ.swap(0) + shouldClearOnCommit.swap(false) } + def abort = { + enqueuedNDequeuedEntries.clear + shouldClearOnCommit.swap(false) + localQ.swap(Queue.empty) + pickMeForDQ.swap(0) + } + + override def enqueue(elems: A*) { register elems.foreach(e => { @@ -382,19 +406,17 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] val (a, q) = localQ.get.get.dequeue localQ.swap(q) a - } - else - throw new NoSuchElementException("trying to dequeue from empty queue") + } else throw new NoSuchElementException("trying to dequeue from empty queue") } } - override def clear = { + override def clear = { register shouldClearOnCommit.swap(true) localQ.swap(Queue.empty) pickMeForDQ.swap(0) } - + override def size: Int = try { storage.size(uuid) + localQ.get.get.length } catch { case e: Exception => 0 } @@ -402,11 +424,11 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] override def isEmpty: Boolean = size == 0 - override def +=(elem: A) = { + override def +=(elem: A) = { enqueue(elem) this } - def ++=(elems: Iterator[A]) = { + def ++=(elems: Iterator[A]) = { enqueue(elems.toList: _*) this } @@ -428,7 +450,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] * Implements a template for a concrete persistent transactional sorted set based storage. *

* Sorting is done based on a zscore. But the computation of zscore has been kept - * outside the abstraction. + * outside the abstraction. *

* zscore can be implemented in a variety of ways by the calling class: *

@@ -445,7 +467,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
  * class Foo {
  *   //..
  * }
- * 
+ *
  * implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
  *   def toZScore = {
  *     //..
@@ -457,9 +479,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
  *
  * @author 
  */
-trait PersistentSortedSet[A] 
-  extends Transactional 
-  with Committable {
+trait PersistentSortedSet[A] extends Transactional with Committable with Abortable {
 
   protected val newElems = TransactionalState.newMap[A, Float]
   protected val removedElems = TransactionalState.newVector[A]
@@ -473,6 +493,11 @@ trait PersistentSortedSet[A]
     removedElems.clear
   }
 
+  def abort = {
+    newElems.clear
+    removedElems.clear
+  }
+
   def +(elem: A, score: Float) = add(elem, score)
 
   def add(elem: A, score: Float) = {
@@ -501,7 +526,7 @@ trait PersistentSortedSet[A]
       }
     }
   }
- 
+
   def size: Int = newElems.size + storage.zcard(uuid) - removedElems.size
 
   def zscore(elem: A): Float = {
@@ -516,9 +541,9 @@ trait PersistentSortedSet[A]
   implicit def order(x: (A, Float)) = new Ordered[(A, Float)] {
     def compare(that: (A, Float)) = x._2 compare that._2
   }
-  
+
   implicit def ordering = new scala.math.Ordering[(A,Float)] {
-    def compare(x: (A, Float),y : (A,Float)) = x._2 compare y._2   
+    def compare(x: (A, Float),y : (A,Float)) = x._2 compare y._2
   }
 
 
@@ -531,7 +556,7 @@ trait PersistentSortedSet[A]
 
     // -1 means the last element, -2 means the second last
     val s = if (start < 0) start + l else start
-    val e = 
+    val e =
       if (end < 0) end + l
       else if (end >= l) (l - 1)
       else end
diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala
index ab0cfaf4d3..a5226eb1a4 100644
--- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala
+++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala
@@ -26,7 +26,7 @@ trait VectorStorageBackend[T] extends StorageBackend {
   def updateVectorStorageEntryFor(name: String, index: Int, elem: T)
   def getVectorStorageEntryFor(name: String, index: Int): T
   def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[T]
-  def getVectorStorageSizeFor(name: String): Int 
+  def getVectorStorageSizeFor(name: String): Int
 }
 
 // for Ref
@@ -47,17 +47,17 @@ trait RefStorageBackend[T] extends StorageBackend {
 trait QueueStorageBackend[T] extends StorageBackend {
   // add to the end of the queue
   def enqueue(name: String, item: T): Boolean
-  
+
   // pop from the front of the queue
   def dequeue(name: String): Option[T]
-  
+
   // get the size of the queue
   def size(name: String): Int
-  
+
   // return an array of items currently stored in the queue
   // start is the item to begin, count is how many items to return
   def peek(name: String, start: Int, count: Int): List[T]
-  
+
   // completely delete the queue
   def remove(name: String): Boolean
 }
@@ -65,19 +65,19 @@ trait QueueStorageBackend[T] extends StorageBackend {
 trait SortedSetStorageBackend[T] extends StorageBackend {
   // add item to sorted set identified by name
   def zadd(name: String, zscore: String, item: T): Boolean
-  
+
   // remove item from sorted set identified by name
   def zrem(name: String, item: T): Boolean
-  
+
   // cardinality of the set identified by name
   def zcard(name: String): Int
-  
+
   // zscore of the item from sorted set identified by name
   def zscore(name: String, item: T): Option[Float]
-  
+
   // zrange from the sorted set identified by name
   def zrange(name: String, start: Int, end: Int): List[T]
 
   // zrange with score from the sorted set identified by name
-  def zrangeWithScore(name: String, start: Int, end: Int): List[(T, Float)] 
+  def zrangeWithScore(name: String, start: Int, end: Int): List[(T, Float)]
 }
diff --git a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala
index 49f46db134..d5581b373b 100644
--- a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala
+++ b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala
@@ -280,7 +280,7 @@ private[akka] object MongoStorageBackend extends
       }
     val currentList = dbobj.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
     currentList.set(index, serializer.out(elem))
-    coll.update(q, 
+    coll.update(q,
       new BasicDBObject().append(KEY, name).append(VALUE, currentList))
   }
 
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
index a09be9bd51..18dd4ce94d 100644
--- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
@@ -1,10 +1,12 @@
 package se.scalablesolutions.akka.persistence.redis
 
+import org.scalatest.junit.JUnitSuite
+
 import org.junit.{Test, Before}
 import org.junit.Assert._
 
-import se.scalablesolutions.akka.actor.{Actor, ActorRef, Transactor}
-import Actor._
+import se.scalablesolutions.akka.actor.{ActorRef, Transactor}
+import se.scalablesolutions.akka.actor.Actor._
 
 /**
  * A persistent actor based on Redis storage.
@@ -23,10 +25,10 @@ case class Balance(accountNo: String)
 case class Debit(accountNo: String, amount: BigInt, failer: ActorRef)
 case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorRef)
 case class Credit(accountNo: String, amount: BigInt)
-case class Log(start: Int, finish: Int)
 case object LogSize
 
 class AccountActor extends Transactor {
+  import self._
   private lazy val accountState = RedisStorage.newMap
   private lazy val txnLog = RedisStorage.newVector
   //timeout = 5000
@@ -35,7 +37,7 @@ class AccountActor extends Transactor {
     // check balance
     case Balance(accountNo) =>
       txnLog.add("Balance:%s".format(accountNo).getBytes)
-      self.reply(BigInt(new String(accountState.get(accountNo.getBytes).get)))
+      reply(BigInt(new String(accountState.get(accountNo.getBytes).get)))
 
     // debit amount: can fail
     case Debit(accountNo, amount, failer) =>
@@ -49,7 +51,7 @@ class AccountActor extends Transactor {
       accountState.put(accountNo.getBytes, (m - amount).toString.getBytes)
       if (amount > m)
         failer !! "Failure"
-      else self.reply(m - amount)
+      reply(m - amount)
 
     // many debits: can fail
     // demonstrates true rollback even if multiple puts have been done
@@ -67,7 +69,7 @@ class AccountActor extends Transactor {
         accountState.put(accountNo.getBytes, (m - bal).toString.getBytes)
       }
       if (bal > m) failer !! "Failure"
-      self.reply(m - bal)
+      reply(m - bal)
 
     // credit amount
     case Credit(accountNo, amount) =>
@@ -79,13 +81,10 @@ class AccountActor extends Transactor {
         case None => 0
       }
       accountState.put(accountNo.getBytes, (m + amount).toString.getBytes)
-      self.reply(m + amount)
+      reply(m + amount)
 
     case LogSize =>
-      self.reply(txnLog.length.asInstanceOf[AnyRef])
-
-    case Log(start, finish) =>
-      self.reply(txnLog.slice(start, finish))
+      reply(txnLog.length.asInstanceOf[AnyRef])
   }
 }
 
@@ -97,62 +96,35 @@ class AccountActor extends Transactor {
   }
 }
 
-import org.scalatest.junit.JUnitSuite
 class RedisPersistentActorSpec extends JUnitSuite {
   @Test
-  def testSuccessfulDebit {
+  def testSuccessfulDebit = {
     val bactor = actorOf[AccountActor]
     bactor.start
     val failer = actorOf[PersistentFailerActor]
     failer.start
+    bactor !! Credit("a-123", 5000)
+    bactor !! Debit("a-123", 3000, failer)
+    assertEquals(BigInt(2000), (bactor !! Balance("a-123")).get)
 
-    val acc = "a-123"
+    bactor !! Credit("a-123", 7000)
+    assertEquals(BigInt(9000), (bactor !! Balance("a-123")).get)
 
-    println("----------- SIZE 0 " + (bactor !! LogSize).get)
+    bactor !! Debit("a-123", 8000, failer)
+    assertEquals(BigInt(1000), (bactor !! Balance("a-123")).get)
 
-    bactor !! Credit(acc, 5000)
-    println("----------- SIZE 1 " + (bactor !! LogSize).get)
-
-    println(bactor !! Balance(acc))
-    println("----------- SIZE 2 " + (bactor !! LogSize).get)
-
-    bactor !! Debit(acc, 3000, failer)
-    println("----------- SIZE 3 " + (bactor !! LogSize).get)
-    
-    assertEquals(BigInt(2000), (bactor !! Balance(acc)).get)
-    println("----------- SIZE 4 " + (bactor !! LogSize).get)
-
-    bactor !! Credit(acc, 7000)
-    println("----------- SIZE 5 " + (bactor !! LogSize).get)
-
-    assertEquals(BigInt(9000), (bactor !! Balance(acc)).get)
-    println("----------- SIZE 6 " + (bactor !! LogSize).get)
- 
-    bactor !! Debit(acc, 8000, failer)
-    println("----------- SIZE 7 " + (bactor !! LogSize).get)
-
-    assertEquals(BigInt(1000), (bactor !! Balance(acc)).get)
-    println("----------- SIZE 8 " + (bactor !! LogSize).get)
-    
-    assert(7 === (bactor !! LogSize).get) // Not counting the failed transaction => 7
-
-    import scala.collection.mutable.ArrayBuffer
-    assert((bactor !! Log(0, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 7)
-    assert((bactor !! Log(0, 0)).get.asInstanceOf[ArrayBuffer[String]].size == 0)
-    assert((bactor !! Log(1, 2)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
-    assert((bactor !! Log(6, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
-    assert((bactor !! Log(0, 1)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
+    val c: Int = (bactor !! LogSize).get
+    assertTrue(7 == c)
   }
 
-  /**
   @Test
-  def testUnsuccessfulDebit {
-    val bactor = actorOf(new AccountActor)
+  def testUnsuccessfulDebit = {
+    val bactor = actorOf[AccountActor]
     bactor.start
     bactor !! Credit("a-123", 5000)
     assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
 
-    val failer = actorOf(new PersistentFailerActor)
+    val failer = actorOf[PersistentFailerActor]
     failer.start
     try {
       bactor !! Debit("a-123", 7000, failer)
@@ -162,19 +134,19 @@ class RedisPersistentActorSpec extends JUnitSuite {
     assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
 
     // should not count the failed one
-    // val c: Int = (bactor !! LogSize).get
-    // assertTrue(3 == c)
+    val c: Int = (bactor !! LogSize).get
+    assertTrue(3 == c)
   }
 
   @Test
-  def testUnsuccessfulMultiDebit {
-    val bactor = actorOf(new AccountActor)
+  def testUnsuccessfulMultiDebit = {
+    val bactor = actorOf[AccountActor]
     bactor.start
     bactor !! Credit("a-123", 5000)
 
     assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
 
-    val failer = actorOf(new PersistentFailerActor)
+    val failer = actorOf[PersistentFailerActor]
     failer.start
     try {
       bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)
@@ -184,8 +156,7 @@ class RedisPersistentActorSpec extends JUnitSuite {
     assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
 
     // should not count the failed one
-    // val c: Int = (bactor !! LogSize).get
-    // assertTrue(3 == c)
+    val c: Int = (bactor !! LogSize).get
+    assertTrue(3 == c)
   }
-**/
 }
diff --git a/akka-samples/akka-sample-ants/src/main/scala/Ants.scala b/akka-samples/akka-sample-ants/src/main/scala/Ants.scala
index 85757f25e8..05fe245b10 100644
--- a/akka-samples/akka-sample-ants/src/main/scala/Ants.scala
+++ b/akka-samples/akka-sample-ants/src/main/scala/Ants.scala
@@ -87,7 +87,7 @@ object World {
     pingEvery(evaporator, EvapMillis)
   }
 
-  private def pingEvery(actor: ActorRef, millis: Long) = 
+  private def pingEvery(actor: ActorRef, millis: Long) =
     Scheduler.schedule(actor, "ping", Config.StartDelay, millis, TimeUnit.MILLISECONDS)
 }
 
diff --git a/akka-samples/akka-sample-camel/src/main/scala/Application1.scala b/akka-samples/akka-sample-camel/src/main/scala/Application1.scala
index dfff0e0539..6dcb437992 100644
--- a/akka-samples/akka-sample-camel/src/main/scala/Application1.scala
+++ b/akka-samples/akka-sample-camel/src/main/scala/Application1.scala
@@ -26,4 +26,4 @@ object Application1 {
     println(actor2 !! Message("actor2"))
   }
 
-}
\ No newline at end of file
+}
diff --git a/akka-samples/akka-sample-camel/src/main/scala/Application2.scala b/akka-samples/akka-sample-camel/src/main/scala/Application2.scala
index e01b510a71..8a789d13bf 100644
--- a/akka-samples/akka-sample-camel/src/main/scala/Application2.scala
+++ b/akka-samples/akka-sample-camel/src/main/scala/Application2.scala
@@ -19,4 +19,4 @@ object Application2 {
     RemoteNode.start("localhost", 7777)
     RemoteNode.register("remote2", actorOf[RemoteActor2].start)
   }
-}
\ No newline at end of file
+}
diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala
index 29e65c3051..138e4c364d 100644
--- a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala
+++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala
@@ -77,4 +77,4 @@ class CustomRouteBuilder extends RouteBuilder {
       }
     })
   }
-}
\ No newline at end of file
+}
diff --git a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala
index f193efd217..f244f8eeef 100644
--- a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala
+++ b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala
@@ -27,17 +27,17 @@ First we need to download, build and start up Redis:
 4. Run: ‘./redis-server’.
 For details on how to set up Redis server have a look at http://code.google.com/p/redis/wiki/QuickStart.
 
-Then to run the sample: 
+Then to run the sample:
 
 1. Fire up two shells. For each of them:
   - Step down into to the root of the Akka distribution.
   - Set 'export AKKA_HOME=.
   - Run 'sbt console' to start up a REPL (interpreter).
-2. In the first REPL you get execute: 
+2. In the first REPL you get execute:
   - scala> import sample.chat._
   - scala> import se.scalablesolutions.akka.actor.Actor._
   - scala> val chatService = actorOf[ChatService].start
-3. In the second REPL you get execute: 
+3. In the second REPL you get execute:
     - scala> import sample.chat._
     - scala> Runner.run
 4. See the chat simulation run.
@@ -60,12 +60,12 @@ case class ChatMessage(from: String, message: String) extends Event
 /**
  * Chat client.
  */
-class ChatClient(val name: String) { 
+class ChatClient(val name: String) {
   val chat = RemoteClient.actorFor("chat:service", "localhost", 9999)
 
-  def login =                 chat ! Login(name) 
-  def logout =                chat ! Logout(name)  
-  def post(message: String) = chat ! ChatMessage(name, name + ": " + message)  
+  def login =                 chat ! Login(name)
+  def logout =                chat ! Logout(name)
+  def post(message: String) = chat ! ChatMessage(name, name + ": " + message)
   def chatLog: ChatLog =     (chat !! GetChatLog(name)).getOrElse(throw new Exception("Couldn't get the chat log from ChatServer"))
 }
 
@@ -75,15 +75,15 @@ class ChatClient(val name: String) {
 class Session(user: String, storage: ActorRef) extends Actor {
   private val loginTime = System.currentTimeMillis
   private var userLog: List[String] = Nil
-  
+
   log.info("New session for user [%s] has been created at [%s]", user, loginTime)
 
   def receive = {
-    case msg @ ChatMessage(from, message) => 
+    case msg @ ChatMessage(from, message) =>
       userLog ::= message
       storage ! msg
-      
-    case msg @ GetChatLog(_) => 
+
+    case msg @ GetChatLog(_) =>
       storage forward msg
   }
 }
@@ -97,24 +97,24 @@ trait ChatStorage extends Actor
  * Redis-backed chat storage implementation.
  */
 class RedisChatStorage extends ChatStorage {
-  self.lifeCycle = Some(LifeCycle(Permanent))    
+  self.lifeCycle = Some(LifeCycle(Permanent))
   val CHAT_LOG = "akka.chat.log"
-  
+
   private var chatLog = atomic { RedisStorage.getVector(CHAT_LOG) }
 
   log.info("Redis-based chat storage is starting up...")
 
   def receive = {
-    case msg @ ChatMessage(from, message) => 
+    case msg @ ChatMessage(from, message) =>
       log.debug("New chat message [%s]", message)
       atomic { chatLog + message.getBytes("UTF-8") }
 
-    case GetChatLog(_) => 
+    case GetChatLog(_) =>
       val messageList = atomic { chatLog.map(bytes => new String(bytes, "UTF-8")).toList }
       self.reply(ChatLog(messageList))
   }
-  
-  override def postRestart(reason: Throwable) = chatLog = RedisStorage.getVector(CHAT_LOG)  
+
+  override def postRestart(reason: Throwable) = chatLog = RedisStorage.getVector(CHAT_LOG)
 }
 
 /**
@@ -122,27 +122,27 @@ class RedisChatStorage extends ChatStorage {
  * 

* Uses self-type annotation (this: Actor =>) to declare that it needs to be mixed in with an Actor. */ -trait SessionManagement { this: Actor => - +trait SessionManagement { this: Actor => + val storage: ActorRef // needs someone to provide the ChatStorage val sessions = new HashMap[String, ActorRef] - + protected def sessionManagement: Receive = { - case Login(username) => + case Login(username) => log.info("User [%s] has logged in", username) val session = actorOf(new Session(username, storage)) session.start sessions += (username -> session) - - case Logout(username) => + + case Logout(username) => log.info("User [%s] has logged out", username) val session = sessions(username) session.stop - sessions -= username - } - - protected def shutdownSessions = - sessions.foreach { case (_, session) => session.stop } + sessions -= username + } + + protected def shutdownSessions = + sessions.foreach { case (_, session) => session.stop } } /** @@ -152,7 +152,7 @@ trait SessionManagement { this: Actor => */ trait ChatManagement { this: Actor => val sessions: HashMap[String, ActorRef] // needs someone to provide the Session map - + protected def chatManagement: Receive = { case msg @ ChatMessage(from, _) => sessions(from) ! msg case msg @ GetChatLog(from) => sessions(from) forward msg @@ -172,20 +172,20 @@ trait RedisChatStorageFactory { this: Actor => trait ChatServer extends Actor { self.faultHandler = Some(OneForOneStrategy(5, 5000)) self.trapExit = List(classOf[Exception]) - + val storage: ActorRef log.info("Chat server is starting up...") // actor message handler def receive = sessionManagement orElse chatManagement - + // abstract methods to be defined somewhere else protected def chatManagement: Receive - protected def sessionManagement: Receive + protected def sessionManagement: Receive protected def shutdownSessions: Unit - override def shutdown = { + override def shutdown = { log.info("Chat server is shutting down...") shutdownSessions self.unlink(storage) @@ -200,10 +200,10 @@ trait ChatServer extends Actor { * val chatService = Actor.actorOf[ChatService].start *

*/ -class ChatService extends - ChatServer with - SessionManagement with - ChatManagement with +class ChatService extends + ChatServer with + SessionManagement with + ChatManagement with RedisChatStorageFactory { override def init = { RemoteNode.start("localhost", 9999) @@ -217,7 +217,7 @@ class ChatService extends object Runner { def run = { val client = new ChatClient("jonas") - + client.login client.post("Hi there") @@ -228,4 +228,4 @@ object Runner { client.logout } -} \ No newline at end of file +} diff --git a/akka-samples/akka-sample-lift/src/main/scala/bootstrap/liftweb/Boot.scala b/akka-samples/akka-sample-lift/src/main/scala/bootstrap/liftweb/Boot.scala index f8e4f15bd9..0f4a0e9020 100644 --- a/akka-samples/akka-sample-lift/src/main/scala/bootstrap/liftweb/Boot.scala +++ b/akka-samples/akka-sample-lift/src/main/scala/bootstrap/liftweb/Boot.scala @@ -23,7 +23,7 @@ class Boot extends Logging { def boot { // where to search snippet LiftRules.addToPackages("sample.lift") - + LiftRules.httpAuthProtectedResource.prepend { case (Req("liftcount" :: Nil, _, _)) => Full(AuthRole("admin")) } @@ -35,9 +35,9 @@ class Boot extends Logging { true } } - + LiftRules.passNotFoundToChain = true - + val factory = SupervisorFactory( SupervisorConfig( RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), @@ -49,7 +49,7 @@ class Boot extends Logging { LifeCycle(Permanent)) :: Nil)) factory.newInstance.start - + // Build SiteMap // val entries = Menu(Loc("Home", List("index"), "Home")) :: Nil // LiftRules.setSiteMap(SiteMap(entries:_*)) diff --git a/akka-samples/akka-sample-lift/src/test/scala/LiftConsole.scala b/akka-samples/akka-sample-lift/src/test/scala/LiftConsole.scala index a5aa1698e8..43296bc1f4 100644 --- a/akka-samples/akka-sample-lift/src/test/scala/LiftConsole.scala +++ b/akka-samples/akka-sample-lift/src/test/scala/LiftConsole.scala @@ -13,4 +13,4 @@ object LiftConsole { exit(0) } } -*/ \ No newline at end of file +*/ diff --git a/akka-samples/akka-sample-pubsub/src/main/scala/RedisPubSub.scala b/akka-samples/akka-sample-pubsub/src/main/scala/RedisPubSub.scala index 44366d31a4..ca3148aab7 100644 --- a/akka-samples/akka-sample-pubsub/src/main/scala/RedisPubSub.scala +++ b/akka-samples/akka-sample-pubsub/src/main/scala/RedisPubSub.scala @@ -10,7 +10,7 @@ import se.scalablesolutions.akka.actor.Actor._ /** * Sample Akka application for Redis PubSub - * + * * Prerequisite: Need Redis Server running (the version that supports pubsub) *
  * 1. Download redis from http://github.com/antirez/redis
@@ -65,7 +65,7 @@ object Sub {
   val r = new RedisClient("localhost", 6379)
   val s = actorOf(new Subscriber(r))
   s.start
-  s ! Register(callback) 
+  s ! Register(callback)
 
   def sub(channels: String*) = {
     s ! Subscribe(channels.toArray)
@@ -78,29 +78,29 @@ object Sub {
   def callback(pubsub: PubSubMessage) = pubsub match {
     case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
     case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
-    case M(channel, msg) => 
+    case M(channel, msg) =>
       msg match {
         // exit will unsubscribe from all channels and stop subscription service
-        case "exit" => 
+        case "exit" =>
           println("unsubscribe all ..")
           r.unsubscribe
 
         // message "+x" will subscribe to channel x
-        case x if x startsWith "+" => 
+        case x if x startsWith "+" =>
           val s: Seq[Char] = x
           s match {
             case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
           }
 
         // message "-x" will unsubscribe from channel x
-        case x if x startsWith "-" => 
+        case x if x startsWith "-" =>
           val s: Seq[Char] = x
           s match {
             case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
           }
 
         // other message receive
-        case x => 
+        case x =>
           println("received message on channel " + channel + " as : " + x)
       }
   }
diff --git a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala b/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala
index 9070d4a7f8..24f81872f7 100644
--- a/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala
+++ b/akka-samples/akka-sample-remote/src/main/scala/ClientManagedRemoteActorSample.scala
@@ -11,7 +11,7 @@ import se.scalablesolutions.akka.util.Logging
 
 class RemoteHelloWorldActor extends RemoteActor("localhost", 9999) {
   def receive = {
-    case "Hello" => 
+    case "Hello" =>
       log.info("Received 'Hello'")
       self.reply("World")
   }
@@ -27,7 +27,7 @@ object ClientManagedRemoteActorServer extends Logging {
 }
 
 object ClientManagedRemoteActorClient extends Logging {
-  
+
   def run = {
     val actor = actorOf[RemoteHelloWorldActor].start
     log.info("Remote actor created, moved to the server")
diff --git a/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala b/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala
index 87253af24b..2671d9e25e 100644
--- a/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala
+++ b/akka-samples/akka-sample-remote/src/main/scala/ServerManagedRemoteActorSample.scala
@@ -11,7 +11,7 @@ import se.scalablesolutions.akka.util.Logging
 
 class HelloWorldActor extends Actor {
   def receive = {
-    case "Hello" => 
+    case "Hello" =>
       log.info("Received 'Hello'")
       self.reply("World")
   }
@@ -30,7 +30,7 @@ object ServerManagedRemoteActorServer extends Logging {
 }
 
 object ServerManagedRemoteActorClient extends Logging {
-  
+
   def run = {
     val actor = RemoteClient.actorFor("hello-service", "localhost", 9999)
     log.info("Remote client created")
diff --git a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java
index 2f97b4ce92..60eb4f11af 100644
--- a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java
+++ b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java
@@ -1,13 +1,16 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB 
+ */
+
 package sample.rest.java;
 
 import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
 import static se.scalablesolutions.akka.config.JavaConfig.*;
 
 public class Boot {
-  final private ActiveObjectConfigurator manager = new ActiveObjectConfigurator();
-
-  public Boot() throws Exception  {
-    manager.configure(
+  public final static ActiveObjectConfigurator configurator = new ActiveObjectConfigurator();
+  static {
+    configurator.configure(
       new RestartStrategy(new OneForOne(), 3, 5000, new Class[]{Exception.class}),
         new Component[] {
           new Component(
@@ -19,5 +22,5 @@ public class Boot {
             new LifeCycle(new Permanent()),
             1000)
         }).supervise();
-    }
+  }
 }
diff --git a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/PersistentSimpleService.java b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/PersistentSimpleService.java
index 82d4bc5ea5..1108fcdb63 100644
--- a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/PersistentSimpleService.java
+++ b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/PersistentSimpleService.java
@@ -4,10 +4,6 @@
 
 package sample.rest.java;
 
-import javax.ws.rs.Path;
-import javax.ws.rs.GET;
-import javax.ws.rs.Produces;
-
 import se.scalablesolutions.akka.actor.annotation.transactionrequired;
 import se.scalablesolutions.akka.actor.annotation.prerestart;
 import se.scalablesolutions.akka.actor.annotation.postrestart;
@@ -16,14 +12,6 @@ import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage;
 
 import java.nio.ByteBuffer;
 
-/**
- * Try service out by invoking (multiple times):
- * 
- * curl http://localhost:9998/persistentjavacount
- * 
- * Or browse to the URL from a web browser. - */ -@Path("/persistentjavacount") @transactionrequired public class PersistentSimpleService { private String KEY = "COUNTER"; @@ -31,8 +19,6 @@ public class PersistentSimpleService { private boolean hasStartedTicking = false; private PersistentMap storage; - @GET - @Produces({"application/html"}) public String count() { if (storage == null) storage = CassandraStorage.newMap(); if (!hasStartedTicking) { diff --git a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/PersistentSimpleServiceRest.java b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/PersistentSimpleServiceRest.java new file mode 100644 index 0000000000..dc2d2d5aee --- /dev/null +++ b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/PersistentSimpleServiceRest.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package sample.rest.java; + +import javax.ws.rs.Path; +import javax.ws.rs.GET; +import javax.ws.rs.Produces; + +/** + * Try service out by invoking (multiple times): + *
+ * curl http://localhost:9998/persistentjavacount
+ * 
+ * Or browse to the URL from a web browser. + */ +@Path("/persistentjavacount") +public class PersistentSimpleServiceRest { + private PersistentSimpleService service = (PersistentSimpleService) Boot.configurator.getInstance(PersistentSimpleService.class); + + @GET + @Produces({"application/json"}) + public String count() { + return service.count(); + } +} \ No newline at end of file diff --git a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/SimpleService.java b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/SimpleService.java index 260df02a3e..44d23e873c 100644 --- a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/SimpleService.java +++ b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/SimpleService.java @@ -4,10 +4,6 @@ package sample.rest.java; -import javax.ws.rs.Path; -import javax.ws.rs.GET; -import javax.ws.rs.Produces; - import se.scalablesolutions.akka.actor.ActiveObject; import se.scalablesolutions.akka.actor.ActiveObjectContext; import se.scalablesolutions.akka.actor.annotation.transactionrequired; @@ -16,14 +12,6 @@ import se.scalablesolutions.akka.actor.annotation.postrestart; import se.scalablesolutions.akka.stm.TransactionalState; import se.scalablesolutions.akka.stm.TransactionalMap; -/** - * Try service out by invoking (multiple times): - *
- * curl http://localhost:9998/javacount
- * 
- * Or browse to the URL from a web browser. - */ -@Path("/javacount") @transactionrequired public class SimpleService { private String KEY = "COUNTER"; @@ -32,8 +20,6 @@ public class SimpleService { private TransactionalMap storage; private Receiver receiver = ActiveObject.newInstance(Receiver.class); - @GET - @Produces({"application/json"}) public String count() { if (storage == null) storage = TransactionalState.newMap(); if (!hasStartedTicking) { diff --git a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/SimpleServiceRest.java b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/SimpleServiceRest.java new file mode 100644 index 0000000000..ed048f25dc --- /dev/null +++ b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/SimpleServiceRest.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package sample.rest.java; + +import javax.ws.rs.Path; +import javax.ws.rs.GET; +import javax.ws.rs.Produces; + +/** + * Try service out by invoking (multiple times): + *
+ * curl http://localhost:9998/javacount
+ * 
+ * Or browse to the URL from a web browser. + */ +@Path("/javacount") +public class SimpleServiceRest { + private SimpleService service = (SimpleService) Boot.configurator.getInstance(SimpleService.class); + + @GET + @Produces({"application/json"}) + public String count() { + return service.count(); + } +} \ No newline at end of file diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala index e8f3576e9f..eb2d07340e 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala @@ -51,12 +51,12 @@ class SimpleService { @GET @Produces(Array("text/html")) def count = { - //Fetch the first actor of type SimpleServiceActor - //Send it the "Tick" message and expect a NdeSeq back - val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption - r <- a.!![NodeSeq]("Tick")} yield r - //Return either the resulting NodeSeq or a default one - result getOrElse Error in counter + //Fetch the first actor of type SimpleServiceActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption + r <- a.!![NodeSeq]("Tick")} yield r + //Return either the resulting NodeSeq or a default one + result getOrElse Error in counter } } class SimpleServiceActor extends Transactor { @@ -105,12 +105,12 @@ class PersistentSimpleService { @GET @Produces(Array("text/html")) def count = { - //Fetch the first actor of type PersistentSimpleServiceActor - //Send it the "Tick" message and expect a NdeSeq back - val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption - r <- a.!![NodeSeq]("Tick")} yield r - //Return either the resulting NodeSeq or a default one - result getOrElse Error in counter + //Fetch the first actor of type PersistentSimpleServiceActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption + r <- a.!![NodeSeq]("Tick")} yield r + //Return either the resulting NodeSeq or a default one + result getOrElse Error in counter } } @@ -147,18 +147,18 @@ class Chat { @Consumes(Array("application/x-www-form-urlencoded")) @Produces(Array("text/html")) def publishMessage(form: MultivaluedMap[String, String]) = { - val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message")) + val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message")) //Fetch the first actor of type ChatActor - //Send it the "Tick" message and expect a NdeSeq back - val result = for{a <- actorsFor(classOf[ChatActor]).headOption - r <- a.!![String](msg)} yield r - //Return either the resulting String or a default one - result getOrElse "System__error" + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[ChatActor]).headOption + r <- a.!![String](msg)} yield r + //Return either the resulting String or a default one + result getOrElse "System__error" } } object ChatActor { - case class ChatMsg(val who: String, val what: String, val msg: String) + case class ChatMsg(val who: String, val what: String, val msg: String) } class ChatActor extends Actor with Logging { diff --git a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala index 4fd5a8a63d..e6892c7b62 100644 --- a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala @@ -120,12 +120,12 @@ class SecureTickService { def paranoiaTick = tick def tick = { - //Fetch the first actor of type PersistentSimpleServiceActor - //Send it the "Tick" message and expect a NdeSeq back - val result = for{a <- actorsFor(classOf[SecureTickActor]).headOption - r <- a.!![Integer]("Tick")} yield r - //Return either the resulting NodeSeq or a default one - result match { + //Fetch the first actor of type PersistentSimpleServiceActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[SecureTickActor]).headOption + r <- a.!![Integer]("Tick")} yield r + //Return either the resulting NodeSeq or a default one + result match { case (Some(counter)) => (Tick: {counter}) case _ => (Error in counter) } @@ -147,4 +147,4 @@ class SecureTickActor extends Transactor with Logging { self.reply(new Integer(0)) } } -} \ No newline at end of file +} diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 2d8b8dc43e..81f2ee4580 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -1,26 +1,26 @@ #################### # Akka Config File # #################### - + # This file has all the default settings, so all these could be removed with no visible effect. # Modify as needed. - + filename = "./logs/akka.log" roll = "daily" # Options: never, hourly, daily, sunday/monday/... - level = "DEBUG" # Options: fatal, critical, error, warning, info, debug, trace + level = "debug" # Options: fatal, critical, error, warning, info, debug, trace console = on # syslog_host = "" # syslog_server_name = "" - + version = "0.9" - + # FQN (Fully Qualified Name) to the class doing initial active object/actor # supervisor bootstrap, should be defined in default constructor boot = ["sample.camel.Boot", - "sample.rest.java.Boot", + "sample.rest.java.Boot", "sample.rest.scala.Boot", "sample.security.Boot"] @@ -29,9 +29,9 @@ serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability - + throughput = 5 # default throughput for ExecutorBasedEventDrivenDispatcher - + service = on @@ -41,10 +41,10 @@ jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will # begin (or join), commit or rollback the JTA transaction. Default is 'off'. - + - provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI) - # "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta', + provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI) + # "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta', # e.g. you need the akka-jta JARs on classpath). timeout = 60000 @@ -56,7 +56,7 @@ filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use resource_packages = ["sample.rest.scala","sample.rest.java","sample.security"] # List with all resource packages for your Jersey services authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) - + #IF you are using a KerberosAuthenticationActor # # servicePrincipal = "HTTP/localhost@EXAMPLE.COM" @@ -73,11 +73,10 @@ service = on name = "default" # The name of the cluster - actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor" # FQN of an implementation of ClusterActor serializer = "se.scalablesolutions.akka.serialization.Serializer$Java$" # FQN of the serializer class - - + + service = on hostname = "localhost" port = 9999 @@ -89,14 +88,14 @@ read-timeout = 10000 # in millis (10 sec default) - + hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds port = 9160 consistency-level = "QUORUM" # Options: ZERO, ONE, QUORUM, DCQUORUM, DCQUORUMSYNC, ALL, ANY - + hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance port = 27017 diff --git a/config/storage-conf.xml b/config/storage-conf.xml index 06ba8007a2..2647fdcd0e 100644 --- a/config/storage-conf.xml +++ b/config/storage-conf.xml @@ -21,15 +21,15 @@ - akka - - - org.apache.cassandra.locator.EndPointSnitch - + @@ -158,7 +158,7 @@ ~ Authenticator: any IAuthenticator may be used, including your own as long ~ as it is on the classpath. Out of the box, Cassandra provides ~ org.apache.cassandra.auth.AllowAllAuthenticator and, - ~ org.apache.cassandra.auth.SimpleAuthenticator + ~ org.apache.cassandra.auth.SimpleAuthenticator ~ (SimpleAuthenticator uses access.properties and passwd.properties by ~ default). ~ @@ -188,7 +188,7 @@ ~ are sent to the node with the "closest" token, so distributing your ~ tokens equally along the key distribution space will spread keys ~ evenly across your cluster.) This setting is only checked the first - ~ time a node is started. + ~ time a node is started. ~ This can also be useful with RandomPartitioner to force equal spacing ~ of tokens around the hash space, especially for clusters with a small @@ -227,9 +227,9 @@ - 9160 - false @@ -285,16 +285,16 @@ 64 @@ -314,7 +314,7 @@ - +