merged with master; changed renaming of artifacts to use override def artifactID

This commit is contained in:
Andreas Kollegger 2010-06-06 15:08:01 -04:00
commit b4ab04af56
96 changed files with 650 additions and 1717 deletions

3
.gitignore vendored
View file

@ -1,7 +1,8 @@
*~
*#
src_managed
project/plugins/project/
activemq-data
project/plugins/project
project/boot/*
*/project/build/target
*/project/boot

View file

@ -3,24 +3,28 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<name>Akka Functional Tests in Java</name>
<artifactId>akka-fun-test-java</artifactId>
<name>Akka Active Object Tests in Java</name>
<artifactId>akka-active-object-test</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.9</version>
<packaging>jar</packaging>
<properties>
<scala.version>2.8.0.RC2</scala.version>
<atmosphere.version>0.5.2</atmosphere.version>
<jersey.version>1.1.5</jersey.version>
<grizzly.version>1.9.18-i</grizzly.version>
<scala.version>2.8.0.RC3</scala.version>
</properties>
<repositories>
<repository>
<id>embedded-repo</id>
<name>Embedded Repository</name>
<url>file://Users/jboner/src/scala/akka/embedded-repo</url>
<url>file:///Users/jboner/src/scala/akka/embedded-repo</url>
<snapshots />
</repository>
<repository>
<id>jboss</id>
<name>JBoss Repository</name>
<url>https://repository.jboss.org/nexus/content/groups/public</url>
<snapshots />
</repository>
</repositories>
@ -35,59 +39,9 @@
<dependencies>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-kernel_2.8.0.RC2</artifactId>
<artifactId>akka-core_2.8.0.RC3</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-persistence-cassandra_2.8.0.RC2</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.sun.grizzly</groupId>
<artifactId>grizzly-servlet-webserver</artifactId>
<version>${grizzly.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>${jersey.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
<version>${jersey.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-atom</artifactId>
<version>${jersey.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View file

@ -8,13 +8,9 @@ public class AllTest extends TestCase {
public static Test suite() {
TestSuite suite = new TestSuite("All Java tests");
suite.addTestSuite(InMemoryStateTest.class);
//suite.addTestSuite(InMemNestedStateTest.class);
suite.addTestSuite(InMemNestedStateTest.class);
suite.addTestSuite(RemoteInMemoryStateTest.class);
suite.addTestSuite(ActiveObjectGuiceConfiguratorTest.class);
//suite.addTestSuite(PersistentStateTest.class);
//suite.addTestSuite(PersistentNestedStateTest.class);
//suite.addTestSuite(RemotePersistentStateTest.class);
//suite.addTestSuite(RestTest.class);
return suite;
}

View file

@ -9,7 +9,6 @@ import se.scalablesolutions.akka.config.Config;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.kernel.Kernel;
import junit.framework.TestCase;
public class InMemNestedStateTest extends TestCase {

View file

@ -13,7 +13,6 @@ import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.kernel.Kernel;
public class InMemoryStateTest extends TestCase {
static String messageLog = "";

View file

@ -66,23 +66,23 @@ final class ActiveObjectConfiguration {
}
/**
* Holds RTTI (runtime type information) for the Active Object, f.e. current 'sender'
* Holds RTTI (runtime type information) for the Active Object, f.e. current 'sender'
* reference, the 'senderFuture' reference etc.
* <p/>
* In order to make use of this context you have to create a member field in your
* Active Object that has the type 'ActiveObjectContext', then an instance will
* be injected for you to use.
* In order to make use of this context you have to create a member field in your
* Active Object that has the type 'ActiveObjectContext', then an instance will
* be injected for you to use.
* <p/>
* This class does not contain static information but is updated by the runtime system
* at runtime.
* This class does not contain static information but is updated by the runtime system
* at runtime.
* <p/>
* Here is an example of usage:
* Here is an example of usage:
* <pre>
* class Ping {
* // This context will be injected, holds RTTI (runtime type information)
* // for the current message send
* // This context will be injected, holds RTTI (runtime type information)
* // for the current message send
* private ActiveObjectContext context = null;
*
*
* public void hit(int count) {
* Pong pong = (Pong) context.getSender();
* pong.hit(count++)
@ -100,19 +100,19 @@ final class ActiveObjectContext {
* Returns the current sender Active Object reference.
* Scala style getter.
*/
def sender: AnyRef = {
def sender: AnyRef = {
if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.")
else _sender
}
}
/**
* Returns the current sender Active Object reference.
* Java style getter.
*/
def getSender: AnyRef = {
def getSender: AnyRef = {
if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.")
else _sender
}
}
/**
* Returns the current sender future Active Object reference.
@ -364,7 +364,7 @@ object ActiveObject extends Logging {
proxy.asInstanceOf[T]
}
private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef,
private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef,
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val context = injectActiveObjectContext(target)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, false)
@ -462,7 +462,7 @@ object ActiveObject extends Logging {
if (parent != null) injectActiveObjectContext0(activeObject, parent)
else {
log.warning(
"Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.",
"Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.",
activeObject.getClass.getName)
None
}
@ -522,7 +522,7 @@ private[akka] sealed class ActiveObjectAspect {
remoteAddress = init.remoteAddress
timeout = init.timeout
isInitialized = true
}
dispatch(joinPoint)
}
@ -583,7 +583,7 @@ private[akka] sealed class ActiveObjectAspect {
} else future.result
private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE
private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = {
var isEscaped = false
val escapedArgs = for (arg <- args) yield {
@ -606,11 +606,11 @@ private[akka] sealed class ActiveObjectAspect {
joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean, sender: AnyRef, senderFuture: CompletableFuture[Any]) {
override def toString: String = synchronized {
"Invocation [joinPoint: " + joinPoint.toString +
", isOneWay: " + isOneWay +
"Invocation [joinPoint: " + joinPoint.toString +
", isOneWay: " + isOneWay +
", isVoid: " + isVoid +
", sender: " + sender +
", senderFuture: " + senderFuture +
", sender: " + sender +
", senderFuture: " + senderFuture +
"]"
}
@ -653,11 +653,11 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
private var postRestart: Option[Method] = None
private var initTxState: Option[Method] = None
private var context: Option[ActiveObjectContext] = None
def this(transactionalRequired: Boolean) = this(transactionalRequired,None)
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef, ctx: Option[ActiveObjectContext]) = {
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired))
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired))
self.makeTransactionRequired
self.id = targetClass.getName
target = Some(targetInstance)
@ -705,7 +705,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
def receive = {
case Invocation(joinPoint, isOneWay, _, sender, senderFuture) =>
context.foreach { ctx =>
context.foreach { ctx =>
if (sender ne null) ctx._sender = sender
if (senderFuture ne null) ctx._senderFuture = senderFuture
}

View file

@ -295,8 +295,10 @@ trait Actor extends Logging {
type Receive = Actor.Receive
/*
* For internal use only, functions as the implicit sender references when invoking
* one of the message send functions (!, !! and !!!).
* Option[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
* Mainly for internal use, functions as the implicit sender references when invoking
* one of the message send functions ('!', '!!' and '!!!').
*/
implicit val optionSelf: Option[ActorRef] = {
val ref = Actor.actorRefInCreation.value
@ -313,8 +315,10 @@ trait Actor extends Logging {
}
/*
* For internal use only, functions as the implicit sender references when invoking
* the forward function.
* Some[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
* Mainly for internal use, functions as the implicit sender references when invoking
* the 'forward' function.
*/
implicit val someSelf: Some[ActorRef] = optionSelf.asInstanceOf[Some[ActorRef]]
@ -325,9 +329,31 @@ trait Actor extends Logging {
* <pre>
* self ! message
* </pre>
* Here you also find most of the Actor API.
* <p/>
* For example fields like:
* <pre>
* self.dispactcher = ...
* self.trapExit = ...
* self.faultHandler = ...
* self.lifeCycle = ...
* self.sender
* </pre>
* <p/>
* Here you also find methods like:
* <pre>
* self.reply(..)
* self.link(..)
* self.unlink(..)
* self.start(..)
* self.stop(..)
* </pre>
*/
val self: ActorRef = optionSelf.get
self.id = getClass.getName
val self: ActorRef = {
val zelf = optionSelf.get
zelf.id = getClass.getName
zelf
}
/**
* User overridable callback/setting.
@ -339,64 +365,64 @@ trait Actor extends Logging {
* <pre>
* def receive = {
* case Ping =&gt;
* println("got a ping")
* log.info("got a 'Ping' message")
* self.reply("pong")
*
* case OneWay =&gt;
* println("got a oneway")
* log.info("got a 'OneWay' message")
*
* case _ =&gt;
* println("unknown message, ignoring")
* case unknown =&gt;
* log.warning("unknown message [%s], ignoring", unknown)
* }
* </pre>
*/
protected def receive: Receive
/**
* User overridable callback/setting.
* User overridable callback.
* <p/>
* Optional callback method that is called during initialization.
* To be implemented by subclassing actor.
* Is called when an Actor is started by invoking 'actor.start'.
*/
def init {}
/**
* User overridable callback/setting.
* User overridable callback.
* <p/>
* Mandatory callback method that is called during restart and reinitialization after a server crash.
* To be implemented by subclassing actor.
* Is called when 'actor.stop' is invoked.
*/
def shutdown {}
/**
* User overridable callback.
* <p/>
* Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
*/
def preRestart(reason: Throwable) {}
/**
* User overridable callback/setting.
* User overridable callback.
* <p/>
* Mandatory callback method that is called during restart and reinitialization after a server crash.
* To be implemented by subclassing actor.
* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
*/
def postRestart(reason: Throwable) {}
/**
* User overridable callback/setting.
* User overridable callback.
* <p/>
* Optional callback method that is called during termination.
* To be implemented by subclassing actor.
* Is called during initialization. Can be used to initialize transactional state. Will be invoked within a transaction.
*/
def initTransactionalState {}
/**
* User overridable callback/setting.
* <p/>
* Optional callback method that is called during termination.
* To be implemented by subclassing actor.
*/
def shutdown {}
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
private[akka] def base: Receive = lifeCycles orElse (self.hotswap getOrElse receive)
private[akka] def base: Receive = try {
lifeCycles orElse (self.hotswap getOrElse receive)
} catch {
case e: NullPointerException => throw new IllegalStateException(
"The 'self' ActorRef reference for [" + getClass.getName + "] is NULL, error in the ActorRef initialization process.")
}
private val lifeCycles: Receive = {
case HotSwap(code) => self.hotswap = code
@ -414,39 +440,3 @@ trait Actor extends Logging {
override def toString = self.toString
}
/**
* Base class for the different dispatcher types.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed abstract class DispatcherType
/**
* Module that holds the different dispatcher types.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object DispatcherType {
case object EventBasedThreadPooledProxyInvokingDispatcher extends DispatcherType
case object EventBasedSingleThreadDispatcher extends DispatcherType
case object EventBasedThreadPoolDispatcher extends DispatcherType
case object ThreadBasedDispatcher extends DispatcherType
}
/**
* Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
* <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>
* <p/>
* An actor has a well-defined (non-cyclic) life-cycle.
* <pre>
* => NEW (newly created actor) - can't receive messages (yet)
* => STARTED (when 'start' is invoked) - can receive messages
* => SHUT DOWN (when 'exit' is invoked) - can't do anything
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActorMessageInvoker private[akka] (val actorRef: ActorRef) extends MessageInvoker {
def invoke(handle: MessageInvocation) = actorRef.invoke(handle)
}

View file

@ -222,7 +222,7 @@ trait ActorRef extends TransactionManagement {
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
*/
def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture }
/**
* Is the actor being restarted?
*/
@ -300,22 +300,6 @@ trait ActorRef extends TransactionManagement {
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
* Sends a message asynchronously and waits on a future for a reply message.
* Uses the time-out defined in the Actor.
* <p/>
* It waits on the reply either until it receives it (in the form of <code>Some(replyMessage)</code>)
* or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
* <p/>
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
// def !![T](message: Any)(implicit sender: Option[ActorRef] = None): Option[T] = !![T](message, timeout)
/**
* Sends a message asynchronously returns a future holding the eventual reply message.
* <p/>
@ -349,14 +333,15 @@ trait ActorRef extends TransactionManagement {
* Use <code>self.reply(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably: " +
"\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
"\n\t\t2. Invoked a method on an Active Object from an instance NOT an Active Object.")
"\n\t\t2. Invoked a method on an Active Object from an instance NOT an Active Object." +
"\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope")
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
* being processed.
@ -595,7 +580,7 @@ sealed class LocalActorRef private[akka](
@volatile private[akka] var _supervisor: Option[ActorRef] = None
protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
protected[this] val actorInstance = new AtomicReference[Actor](newActor)
protected[this] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
@volatile private var isInInitialization = false
@volatile private var runActorInitialization = false
@ -1224,7 +1209,7 @@ private[akka] case class RemoteActorRef private[akka] (
extends ActorRef {
_uuid = uuuid
timeout = _timeout
start
lazy val remoteClient = RemoteClient.clientFor(hostname, port, loader)

View file

@ -65,6 +65,12 @@ object ActorRegistry extends Logging {
all.toList
}
/**
* Finds any actor that matches T.
*/
def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] =
actorsFor[T](manifest).headOption
/**
* Finds all actors of the exact type specified by the class passed in as the Class argument.
*/

View file

@ -88,7 +88,8 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
* </pre>
* <p/>
*
* IMPORTANT:
* <b>IMPORTANT</b>:
* <p/>
* You can *not* call 'agent.get', 'agent()' or use the monadic 'foreach',
* 'map' and 'flatMap' within an enclosing transaction since that would block
* the transaction indefinitely. But all other operations are fine. The system
@ -103,7 +104,6 @@ sealed class Agent[T] private (initialValue: T) {
import Actor._
private val dispatcher = actorOf(new AgentDispatcher[T](initialValue)).start
dispatcher ! Value(initialValue)
/**
* Submits a request to read the internal state.
@ -215,7 +215,7 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto
import Actor._
log.debug("Starting up Agent [%s]", self.uuid)
private lazy val value = Ref[T]()
private val value = Ref[T](initialValue)
/**
* Periodically handles incoming messages.
@ -233,6 +233,5 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto
* Performs a CAS operation, atomically swapping the internal state with the value
* provided as a by-name parameter.
*/
private final def swap(newData: => T): Unit = value.swap(newData)
private def swap(newData: => T): Unit = value.swap(newData)
}

View file

@ -7,10 +7,43 @@ package se.scalablesolutions.akka.actor
import java.io.File
import java.net.{URL, URLClassLoader}
import java.util.jar.JarFile
import java.util.Enumeration
import se.scalablesolutions.akka.util.{Bootable, Logging}
import se.scalablesolutions.akka.config.Config._
class AkkaDeployClassLoader(urls : List[URL], parent : ClassLoader) extends URLClassLoader(urls.toArray.asInstanceOf[Array[URL]],parent)
{
override def findResources(resource : String) = {
val normalResult = super.findResources(resource)
if(normalResult.hasMoreElements) normalResult else findDeployed(resource)
}
def findDeployed(resource : String) = new Enumeration[URL]{
private val it = getURLs.flatMap( listClassesInPackage(_,resource) ).iterator
def hasMoreElements = it.hasNext
def nextElement = it.next
}
def listClassesInPackage(jar : URL, pkg : String) = {
val f = new File(jar.getFile)
val jf = new JarFile(f)
try {
val es = jf.entries
var result = List[URL]()
while(es.hasMoreElements)
{
val e = es.nextElement
if(!e.isDirectory && e.getName.startsWith(pkg) && e.getName.endsWith(".class"))
result ::= new URL("jar:" + f.toURI.toURL + "!/" + e)
}
result
} finally {
jf.close
}
}
}
/**
* Handles all modules in the deploy directory (load and unload)
*/
@ -46,10 +79,7 @@ trait BootableActorLoaderService extends Bootable with Logging {
log.debug("Loading dependencies [%s]", dependencyJars)
val allJars = toDeploy ::: dependencyJars
URLClassLoader.newInstance(
allJars.toArray.asInstanceOf[Array[URL]],
Thread.currentThread.getContextClassLoader)
//parentClassLoader)
new AkkaDeployClassLoader(allJars,Thread.currentThread.getContextClassLoader)
} else Thread.currentThread.getContextClassLoader)
}

View file

@ -5,7 +5,7 @@
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy, ConfiguratorRepository, Configurator}
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.remote.RemoteServer
import Actor._
@ -120,18 +120,17 @@ class SupervisorFactory private[akka] (val config: SupervisorConfig) extends Log
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class Supervisor private[akka] (
handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]])
extends Configurator {
handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]) {
import Supervisor._
private val childActors = new ConcurrentHashMap[String, List[ActorRef]]
private val childSupervisors = new CopyOnWriteArrayList[Supervisor]
private[akka] val supervisor = SupervisorActor(handler, trapExceptions)
private val _childActors = new ConcurrentHashMap[String, List[ActorRef]]
private val _childSupervisors = new CopyOnWriteArrayList[Supervisor]
private[akka] val supervisor = actorOf(new SupervisorActor(handler, trapExceptions)).start
def uuid = supervisor.uuid
def start: Supervisor = {
ConfiguratorRepository.registerConfigurator(this)
this
}
@ -141,15 +140,11 @@ sealed class Supervisor private[akka] (
def unlink(child: ActorRef) = supervisor.unlink(child)
// FIXME recursive search + do not fix if we remove feature that Actors can be RESTful usin Jersey annotations
def getInstance[T](clazz: Class[T]): List[T] = childActors.get(clazz.getName).asInstanceOf[List[T]]
def children: List[ActorRef] =
_childActors.values.toArray.toList.asInstanceOf[List[List[ActorRef]]].flatten
// FIXME recursive search + do not fix if we remove feature that Actors can be RESTful usin Jersey annotations
def getComponentInterfaces: List[Class[_]] =
childActors.values.toArray.toList.asInstanceOf[List[List[AnyRef]]].flatten.map(_.getClass)
// FIXME recursive search + do not fix if we remove feature that Actors can be RESTful usin Jersey annotations
def isDefined(clazz: Class[_]): Boolean = childActors.containsKey(clazz.getName)
def childSupervisors: List[Supervisor] =
_childActors.values.toArray.toList.asInstanceOf[List[Supervisor]]
def configure(config: SupervisorConfig): Unit = config match {
case SupervisorConfig(_, servers) =>
@ -159,11 +154,11 @@ sealed class Supervisor private[akka] (
actorRef.start
val className = actorRef.actor.getClass.getName
val currentActors = {
val list = childActors.get(className)
val list = _childActors.get(className)
if (list eq null) List[ActorRef]()
else list
}
childActors.put(className, actorRef :: currentActors)
_childActors.put(className, actorRef :: currentActors)
actorRef.lifeCycle = Some(lifeCycle)
supervisor.link(actorRef)
remoteAddress.foreach(address =>
@ -171,7 +166,7 @@ sealed class Supervisor private[akka] (
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val childSupervisor = Supervisor(supervisorConfig)
supervisor.link(childSupervisor.supervisor)
childSupervisors.add(childSupervisor)
_childSupervisors.add(childSupervisor)
})
}
}

View file

@ -40,7 +40,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
* @param clazz the class for the active object
* @return the active objects for the class
*/
override def getInstance[T](clazz: Class[T]): List[T] = synchronized {
def getInstance[T](clazz: Class[T]): List[T] = synchronized {
log.debug("Retrieving active object [%s]", clazz.getName)
if (injector eq null) throw new IllegalStateException(
"inject() and/or supervise() must be called before invoking getInstance(clazz)")
@ -52,7 +52,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
List(proxy.asInstanceOf[T])
}
override def isDefined(clazz: Class[_]): Boolean = synchronized {
def isDefined(clazz: Class[_]): Boolean = synchronized {
activeObjectRegistry.get(clazz).isDefined
}
@ -60,7 +60,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
injector.getInstance(clazz).asInstanceOf[T]
}
override def getComponentInterfaces: List[Class[_]] =
def getComponentInterfaces: List[Class[_]] =
for (c <- components) yield {
if (c.intf.isDefined) c.intf.get
else c.target
@ -122,7 +122,6 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
override def supervise: ActiveObjectConfiguratorBase = synchronized {
if (injector eq null) inject
supervisor = Some(ActiveObject.supervise(restartStrategy, supervised))
ConfiguratorRepository.registerConfigurator(this)
this
}

View file

@ -6,24 +6,7 @@ package se.scalablesolutions.akka.config
import ScalaConfig.{RestartStrategy, Component}
/**
* Manages the active abject or actor that has been put under supervision for the class specified.
*/
private[akka] trait Configurator {
/**
* Returns the active abject or actor that has been put under supervision for the class specified.
*
* @param clazz the class for the active object
* @return the active object for the class
*/
def getInstance[T](clazz: Class[T]): List[T]
def getComponentInterfaces: List[Class[_]]
def isDefined(clazz: Class[_]): Boolean
}
private[akka] trait ActiveObjectConfiguratorBase extends Configurator {
private[akka] trait ActiveObjectConfiguratorBase {
def getExternalDependency[T](clazz: Class[T]): T
def configure(restartStrategy: RestartStrategy, components: List[Component]): ActiveObjectConfiguratorBase

View file

@ -1,29 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.config
import scala.collection.mutable.HashSet
import se.scalablesolutions.akka.util.Logging
object ConfiguratorRepository extends Logging {
private val configuration = new HashSet[Configurator]
def registerConfigurator(conf: Configurator) = synchronized {
configuration += conf
}
def getConfigurators: List[Configurator] = synchronized {
configuration.toList
//configurations.getOrElse(ctx, throw new IllegalArgumentException("No configuration for servlet context [" + ctx + "]"))
}
}
class ConfiguratorRepository extends Logging {
def registerConfigurator(conf: Configurator) = ConfiguratorRepository.registerConfigurator(conf)
def getConfigurators: List[Configurator] = ConfiguratorRepository.getConfigurators
}

View file

@ -7,19 +7,19 @@ package se.scalablesolutions.akka.dispatch
import java.util.{LinkedList, Queue, List}
import java.util.HashMap
import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor, ActorRef}
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher {
@volatile protected var active: Boolean = false
protected val queue = new ReactiveMessageQueue(name)
protected val messageInvokers = new HashMap[AnyRef, MessageInvoker]
protected val messageInvokers = new HashMap[ActorRef, ActorRef]
protected var selectorThread: Thread = _
protected val guard = new Object
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
override def register(actorRef: ActorRef) = synchronized {
messageInvokers.put(actorRef, new ActorMessageInvoker(actorRef))
messageInvokers.put(actorRef, actorRef)
super.register(actorRef)
}

View file

@ -40,7 +40,7 @@ import se.scalablesolutions.akka.config.Config.config
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Dispatchers {
val THROUGHPUT = config.getInt("akka.dispatcher.throughput", 5)
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
override def register(actor: ActorRef) = {

View file

@ -54,6 +54,10 @@ import se.scalablesolutions.akka.actor.ActorRef
* the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
* @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
* mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher
* always continues until the mailbox is empty.
* Larger values (or zero or negative) increase througput, smaller values increase fairness
*/
class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispatchers.THROUGHPUT) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String) = this(_name, Dispatchers.THROUGHPUT) // Needed for Java API usage
@ -70,30 +74,20 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
def run = {
var lockAcquiredOnce = false
var finishedBeforeMailboxEmpty = false
// this do-while loop is required to prevent missing new messages between the end of the inner while
// loop and releasing the lock
val lock = receiver.dispatcherLock
val mailbox = receiver.mailbox
// this do-while loop is required to prevent missing new messages between the end of the inner while
// loop and releasing the lock
do {
if (lock.tryLock) {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
lockAcquiredOnce = true
try {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
var i = 0
var messageInvocation = mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
i += 1
if (i < throughput)
messageInvocation = mailbox.poll
else {
finishedBeforeMailboxEmpty = !mailbox.isEmpty
messageInvocation = null
}
}
finishedBeforeMailboxEmpty = processMailbox(receiver)
} finally {
lock.unlock
if (finishedBeforeMailboxEmpty) dispatch(receiver)
if (finishedBeforeMailboxEmpty)
dispatch(receiver)
}
}
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty))
@ -101,6 +95,30 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
})
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")
/**
* Process the messages in the mailbox of the given actor.
*
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
def processMailbox(receiver: ActorRef): Boolean = {
var processedMessages = 0
var messageInvocation = receiver.mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
processedMessages += 1
// check if we simply continue with other messages, or reached the throughput limit
if (throughput <= 0 || processedMessages < throughput)
messageInvocation = receiver.mailbox.poll
else {
return !receiver.mailbox.isEmpty
messageInvocation = null
}
}
return false
}
def start = if (!active) {
log.debug("Starting ExecutorBasedEventDrivenDispatcher [%s]", name)
log.debug("Throughput for %s = %d", name, throughput)

View file

@ -13,6 +13,9 @@ import java.util.concurrent.ConcurrentHashMap
import org.multiverse.commitbarriers.CountDownCommitBarrier
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final class MessageInvocation(val receiver: ActorRef,
val message: Any,
val sender: Option[ActorRef],
@ -49,14 +52,16 @@ final class MessageInvocation(val receiver: ActorRef,
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageQueue {
def append(handle: MessageInvocation)
}
trait MessageInvoker {
def invoke(message: MessageInvocation)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageDispatcher extends Logging {
protected val references = new ConcurrentHashMap[String, ActorRef]
def dispatch(invocation: MessageInvocation)
@ -65,14 +70,16 @@ trait MessageDispatcher extends Logging {
def register(actorRef: ActorRef) = references.put(actorRef.uuid, actorRef)
def unregister(actorRef: ActorRef) = {
references.remove(actorRef.uuid)
if (canBeShutDown)
shutdown // shut down in the dispatcher's references is zero
if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero
}
def canBeShutDown: Boolean = references.isEmpty
def isShutdown: Boolean
def usesActorMailbox : Boolean
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageDemultiplexer {
def select
def wakeUp

View file

@ -7,17 +7,16 @@ package se.scalablesolutions.akka.dispatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.Queue
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorMessageInvoker}
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ThreadBasedDispatcher(actor: ActorRef) extends MessageDispatcher {
class ThreadBasedDispatcher(private val actor: ActorRef) extends MessageDispatcher {
private val name = actor.getClass.getName + ":" + actor.uuid
private val threadName = "thread-based:dispatcher:" + name
private val messageHandler = new ActorMessageInvoker(actor)
private val queue = new BlockingMessageQueue(name)
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
@ -30,7 +29,7 @@ class ThreadBasedDispatcher(actor: ActorRef) extends MessageDispatcher {
override def run = {
while (active) {
try {
messageHandler.invoke(queue.take)
actor.invoke(queue.take)
} catch { case e: InterruptedException => active = false }
}
}

View file

@ -261,11 +261,11 @@ object Cluster extends Cluster with Logging {
sup <- createSupervisor(actorRef)
} {
val serializer = Class.forName(config.getString(
"akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
.newInstance.asInstanceOf[Serializer]
"akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
.newInstance.asInstanceOf[Serializer]
classLoader = serializerClassLoader orElse classLoader
serializer.classLoader = classLoader
classLoader = serializerClassLoader orElse classLoader
serializer.classLoader = classLoader
actorRef.start
sup.start
actorRef ! InitClusterActor(serializer)

View file

@ -82,19 +82,19 @@ object RemoteClient extends Logging {
private[akka] def actorFor(uuid: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef =
RemoteActorRef(uuid, className, hostname, port, timeout, loader)
def clientFor(hostname: String, port: Int): RemoteClient =
def clientFor(hostname: String, port: Int): RemoteClient =
clientFor(new InetSocketAddress(hostname, port), None)
def clientFor(hostname: String, port: Int, loader: ClassLoader): RemoteClient =
def clientFor(hostname: String, port: Int, loader: ClassLoader): RemoteClient =
clientFor(new InetSocketAddress(hostname, port), Some(loader))
def clientFor(address: InetSocketAddress): RemoteClient =
def clientFor(address: InetSocketAddress): RemoteClient =
clientFor(address, None)
def clientFor(address: InetSocketAddress, loader: ClassLoader): RemoteClient =
def clientFor(address: InetSocketAddress, loader: ClassLoader): RemoteClient =
clientFor(address, Some(loader))
private[akka] def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient =
private[akka] def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient =
clientFor(new InetSocketAddress(hostname, port), loader)
private[akka] def clientFor(address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized {
@ -155,7 +155,7 @@ object RemoteClient extends Logging {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteClient(val hostname: String, val port: Int, loader: Option[ClassLoader]) extends Logging {
class RemoteClient private[akka] (val hostname: String, val port: Int, loader: Option[ClassLoader]) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port
@volatile private[remote] var isRunning = false
@ -203,6 +203,10 @@ class RemoteClient(val hostname: String, val port: Int, loader: Option[ClassLoad
}
}
def registerListener(actorRef: ActorRef) = listeners.add(actorRef)
def deregisterListener(actorRef: ActorRef) = listeners.remove(actorRef)
def send[T](request: RemoteRequestProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) {
if (request.getIsOneWay) {
connection.getChannel.write(request)
@ -222,17 +226,13 @@ class RemoteClient(val hostname: String, val port: Int, loader: Option[ClassLoad
throw exception
}
def registerSupervisorForActor(actorRef: ActorRef) =
private[akka] def registerSupervisorForActor(actorRef: ActorRef) =
if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorRef + " since it is not under supervision")
else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef)
def deregisterSupervisorForActor(actorRef: ActorRef) =
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef) =
if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorRef + " since it is not under supervision")
else supervisors.remove(actorRef.supervisor.get.uuid)
def registerListener(actorRef: ActorRef) = listeners.add(actorRef)
def deregisterListener(actorRef: ActorRef) = listeners.remove(actorRef)
}
/**
@ -330,7 +330,7 @@ class RemoteClientHandler(val name: String,
client.connection = bootstrap.connect(remoteAddress)
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
if (!client.connection.isSuccess) {
client.listeners.toArray.foreach(l =>
client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
@ -339,13 +339,13 @@ class RemoteClientHandler(val name: String,
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.foreach(l =>
client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientConnected(client.hostname, client.port))
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.foreach(l =>
client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port))
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
}

View file

@ -184,7 +184,7 @@ class RemoteServer extends Logging {
def start(_hostname: String, _port: Int): RemoteServer =
start(_hostname, _port, None)
private def start(_hostname: String, _port: Int, loader: ClassLoader): RemoteServer =
private def start(_hostname: String, _port: Int, loader: ClassLoader): RemoteServer =
start(_hostname, _port, Some(loader))
private def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized {
@ -364,12 +364,12 @@ class RemoteServerHandler(
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
actorRef.start
val message = RemoteProtocolBuilder.getMessage(request)
if (request.hasSender) {
val sender = request.getSender
if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtobuf(sender, applicationLoader)))
} else {
val sender = if (request.hasSender) Some(ActorRef.fromProtobuf(request.getSender, applicationLoader))
else None
if (request.getIsOneWay) actorRef.!(message)(sender)
else {
try {
val resultOrNone = actorRef !! message
val resultOrNone = actorRef.!!(message)(sender)
val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder

View file

@ -13,7 +13,7 @@ case class Listen(listener: ActorRef) extends ListenerMessage
case class Deafen(listener: ActorRef) extends ListenerMessage
case class WithListeners(f: List[ActorRef] => Unit) extends ListenerMessage
/**
/**
* Listeners is a generic trait to implement listening capability on an Actor.
* <p/>
* Use the <code>gossip(msg)</code> method to have it sent to the listeners.
@ -34,6 +34,6 @@ trait Listeners { self: Actor =>
}
protected def gossip(msg: Any) = listenersAsList foreach (_ ! msg)
private def listenersAsList: List[ActorRef] = listeners.toArray.toList.asInstanceOf[List[ActorRef]]
}

View file

@ -164,7 +164,7 @@ object Transaction {
}
/**
* See ScalaDoc on Transaction.Local class.
* See ScalaDoc on Transaction.Local class.
*/
def atomically[A](firstBody: => A) = elseBody(firstBody)
@ -282,10 +282,10 @@ object Transaction {
setTransaction(Some(tx))
mtx.registerLifecycleListener(new TransactionLifecycleListener() {
def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event.name match {
case "postCommit" =>
case "postCommit" =>
log.trace("Committing transaction [%s]", mtx)
tx.commit
case "postAbort" =>
case "postAbort" =>
log.trace("Aborting transaction [%s]", mtx)
tx.abort
case _ => {}

View file

@ -21,14 +21,14 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4")
val targetOk = new AtomicInteger(0)
val t1 = actorOf( new Actor() {
def receive = {
def receive = {
case `testMsg1` => self.reply(3)
case `testMsg2` => self.reply(7)
}
} ).start
val t2 = actorOf( new Actor() {
def receive = {
def receive = {
case `testMsg3` => self.reply(11)
}
}).start
@ -43,9 +43,9 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
b <- (d.!![Int](testMsg2,5000))
c <- (d.!![Int](testMsg3,5000))
} yield a + b + c
result.get must be(21)
for(a <- List(t1,t2,d)) a.stop
for(a <- List(t1,t2,d)) a.stop
}
@Test def testLogger = {

View file

@ -95,7 +95,7 @@ class StmSpec extends
val size2: Int = (actor !! Size).getOrElse(fail("Could not get Vector::size"))
size2 should equal(3)
} catch {
case e =>
case e =>
e.printStackTrace
fail(e.toString)
}
@ -122,7 +122,7 @@ class StmSpec extends
val size4: Int = (actor !! Size).getOrElse(fail("Could not get size"))
size4 should equal(3)
} catch {
case e =>
case e =>
fail(e.toString)
}
}
@ -130,7 +130,7 @@ class StmSpec extends
/*
describe("Multiverse API") {
it("should blablabla") {
import org.multiverse.api.programmatic._
// import org.multiverse.api._
import org.multiverse.templates._
@ -139,13 +139,13 @@ class StmSpec extends
import org.multiverse.api.{GlobalStmInstance, ThreadLocalTransaction, Transaction => MultiverseTransaction}
import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.commitbarriers._
def createRef[T]: ProgrammaticReference[T] = GlobalStmInstance
.getGlobalStmInstance
.getProgrammaticReferenceFactoryBuilder
.build
.atomicCreateReference(null.asInstanceOf[T])
val ref1 = Ref(0)//createRef[Int]
val ref2 = Ref(0)//createRef[Int]
@ -185,13 +185,13 @@ class GlobalTransactionVectorTestActor extends Actor {
import se.scalablesolutions.akka.stm.Transaction.Global
private val vector: TransactionalVector[Int] = Global.atomic { TransactionalVector(1) }
def receive = {
case Add(value) =>
case Add(value) =>
Global.atomic { vector + value}
self.reply(Success)
case Size =>
case Size =>
val size = Global.atomic { vector.size }
self.reply(size)
}
@ -200,12 +200,12 @@ class GlobalTransactionVectorTestActor extends Actor {
class NestedTransactorLevelOneActor extends Actor {
import GlobalTransactionVectorTestActor._
private val nested = actorOf[NestedTransactorLevelTwoActor].start
def receive = {
case add @ Add(_) =>
case add @ Add(_) =>
self.reply((nested !! add).get)
case Size =>
case Size =>
self.reply((nested !! Size).get)
case "HiLevelOne" => println("HiLevelOne")
@ -216,15 +216,15 @@ class NestedTransactorLevelOneActor extends Actor {
class NestedTransactorLevelTwoActor extends Actor {
import GlobalTransactionVectorTestActor._
private val ref = Ref(0)
def receive = {
case Add(value) =>
case Add(value) =>
ref.swap(value)
self.reply(Success)
case Size =>
case Size =>
self.reply(ref.getOrElse(-1))
case "HiLevelTwo" => println("HiLevelTwo")
}
}

View file

@ -1,14 +0,0 @@
package se.scalablesolutions.akka.api;
import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
@Path("/foo")
public class JerseyFoo {
@GET
@Produces({"application/json"})
public String foo() {
return "hello foo";
}
}

View file

@ -1,11 +0,0 @@
package se.scalablesolutions.akka.api;
public class PersistenceManager {
private static volatile boolean isRunning = false;
public static void init() {
if (!isRunning) {
se.scalablesolutions.akka.kernel.Kernel$.MODULE$.startRemoteService();
isRunning = true;
}
}
}

View file

@ -1,26 +0,0 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.persistence.common.*;
import se.scalablesolutions.akka.persistence.cassandra.*;
import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
public class PersistentClasher {
private PersistentMap state;
@inittransactionalstate
public void init() {
state = CassandraStorage.newMap();
}
public String getState(String key) {
return (String)state.get(key).get();
}
public void setState(String key, String msg) {
state.put(key, msg);
}
public void clash() {
state.put("clasher", "was here");
}
}

View file

@ -1,7 +0,0 @@
package se.scalablesolutions.akka.api;
public class PersistentFailer implements java.io.Serializable {
public int fail() {
throw new RuntimeException("expected");
}
}

View file

@ -1,110 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.config.*;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.kernel.Kernel;
import junit.framework.TestCase;
public class PersistentNestedStateTest extends TestCase {
static String messageLog = "";
final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
protected void setUp() {
PersistenceManager.init();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}),
new Component[]{
new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 10000000),
new Component(PersistentStatefulNested.class, new LifeCycle(new Permanent()), 10000000),
new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000)
//new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent()), 100000)
}).inject().supervise();
}
protected void tearDown() {
conf.stop();
}
public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() throws Exception {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
assertEquals("new state", nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
assertEquals("init", nested.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
}
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setVectorState("init"); // set init state
PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
nested.setVectorState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
assertEquals(2, stateful.getVectorLength()); // BAD: keeps one element since last test
assertEquals(2, nested.getVectorLength());
}
public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setVectorState("init"); // set init state
PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
nested.setVectorState("init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals(1, stateful.getVectorLength());
assertEquals(1, nested.getVectorLength());
}
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
stateful.setRefState("init"); // set init state
nested.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
assertEquals("new state", stateful.getRefState());
assertEquals("new state", nested.getRefState());
}
public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
stateful.setRefState("init"); // set init state
nested.setRefState("init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getRefState()); // check that state is == init state
assertEquals("init", nested.getRefState()); // check that state is == init state
}
}

View file

@ -1,89 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.config.*;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import junit.framework.TestCase;
public class PersistentStateTest extends TestCase {
static String messageLog = "";
final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
protected void setUp() {
PersistenceManager.init();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}),
new Component[] {
new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 10000000),
new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000)
//new Component(PersistentClasher.class, new LifeCycle(new Permanent()), 100000)
}).supervise();
}
protected void tearDown() {
conf.stop();
}
public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
}
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setVectorState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("init", stateful.getVectorState(0));
assertEquals("new state", stateful.getVectorState(1));
}
public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setVectorState("init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getVectorState(0)); // check that state is == init state
}
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getRefState());
}
public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setRefState("init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getRefState()); // check that state is == init state
}
}

View file

@ -1,84 +0,0 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.persistence.common.*;
import se.scalablesolutions.akka.persistence.cassandra.*;
@transactionrequired
public class PersistentStateful {
private PersistentMap mapState;
private PersistentVector vectorState;
private PersistentRef refState;
@inittransactionalstate
public void init() {
mapState = CassandraStorage.newMap();
vectorState = CassandraStorage.newVector();
refState = CassandraStorage.newRef();
}
public String getMapState(String key) {
byte[] bytes = (byte[]) mapState.get(key.getBytes()).get();
return new String(bytes, 0, bytes.length);
}
public String getVectorState(int index) {
byte[] bytes = (byte[]) vectorState.get(index);
return new String(bytes, 0, bytes.length);
}
public int getVectorLength() {
return vectorState.length();
}
public String getRefState() {
if (refState.isDefined()) {
byte[] bytes = (byte[]) refState.get().get();
return new String(bytes, 0, bytes.length);
} else throw new IllegalStateException("No such element");
}
public void setMapState(String key, String msg) {
mapState.put(key.getBytes(), msg.getBytes());
}
public void setVectorState(String msg) {
vectorState.add(msg.getBytes());
}
public void setRefState(String msg) {
refState.swap(msg.getBytes());
}
public void success(String key, String msg) {
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
}
public String failure(String key, String msg, PersistentFailer failer) {
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
failer.fail();
return msg;
}
public String success(String key, String msg, PersistentStatefulNested nested) {
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
nested.success(key, msg);
return msg;
}
public String failure(String key, String msg, PersistentStatefulNested nested, PersistentFailer failer) {
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
nested.failure(key, msg, failer);
return msg;
}
}

View file

@ -1,70 +0,0 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.persistence.common.*;
import se.scalablesolutions.akka.persistence.cassandra.*;
@transactionrequired
public class PersistentStatefulNested {
private PersistentMap mapState;
private PersistentVector vectorState;
private PersistentRef refState;
@inittransactionalstate
public void init() {
mapState = CassandraStorage.newMap();
vectorState = CassandraStorage.newVector();
refState = CassandraStorage.newRef();
}
public String getMapState(String key) {
byte[] bytes = (byte[]) mapState.get(key.getBytes()).get();
return new String(bytes, 0, bytes.length);
}
public String getVectorState(int index) {
byte[] bytes = (byte[]) vectorState.get(index);
return new String(bytes, 0, bytes.length);
}
public int getVectorLength() {
return vectorState.length();
}
public String getRefState() {
if (refState.isDefined()) {
byte[] bytes = (byte[]) refState.get().get();
return new String(bytes, 0, bytes.length);
} else throw new IllegalStateException("No such element");
}
public void setMapState(String key, String msg) {
mapState.put(key.getBytes(), msg.getBytes());
}
public void setVectorState(String msg) {
vectorState.add(msg.getBytes());
}
public void setRefState(String msg) {
refState.swap(msg.getBytes());
}
public String success(String key, String msg) {
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
return msg;
}
public String failure(String key, String msg, PersistentFailer failer) {
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
failer.fail();
return msg;
}
}

View file

@ -1,403 +0,0 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
package se.scalablesolutions.akka.api;
public final class ProtobufProtocol {
private ProtobufProtocol() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
public static final class ProtobufPOJO extends
com.google.protobuf.GeneratedMessage {
// Use ProtobufPOJO.newBuilder() to construct.
private ProtobufPOJO() {}
private static final ProtobufPOJO defaultInstance = new ProtobufPOJO();
public static ProtobufPOJO getDefaultInstance() {
return defaultInstance;
}
public ProtobufPOJO getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return se.scalablesolutions.akka.api.ProtobufProtocol.internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return se.scalablesolutions.akka.api.ProtobufProtocol.internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_fieldAccessorTable;
}
// required uint64 id = 1;
public static final int ID_FIELD_NUMBER = 1;
private boolean hasId;
private long id_ = 0L;
public boolean hasId() { return hasId; }
public long getId() { return id_; }
// required string name = 2;
public static final int NAME_FIELD_NUMBER = 2;
private boolean hasName;
private java.lang.String name_ = "";
public boolean hasName() { return hasName; }
public java.lang.String getName() { return name_; }
// required bool status = 3;
public static final int STATUS_FIELD_NUMBER = 3;
private boolean hasStatus;
private boolean status_ = false;
public boolean hasStatus() { return hasStatus; }
public boolean getStatus() { return status_; }
public final boolean isInitialized() {
if (!hasId) return false;
if (!hasName) return false;
if (!hasStatus) return false;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
if (hasId()) {
output.writeUInt64(1, getId());
}
if (hasName()) {
output.writeString(2, getName());
}
if (hasStatus()) {
output.writeBool(3, getStatus());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (hasId()) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(1, getId());
}
if (hasName()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(2, getName());
}
if (hasStatus()) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(3, getStatus());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeDelimitedFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeDelimitedFrom(input, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder> {
private se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO result;
// Construct using se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.newBuilder()
private Builder() {}
private static Builder create() {
Builder builder = new Builder();
builder.result = new se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO();
return builder;
}
protected se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO internalGetResult() {
return result;
}
public Builder clear() {
if (result == null) {
throw new IllegalStateException(
"Cannot call clear() after build().");
}
result = new se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO();
return this;
}
public Builder clone() {
return create().mergeFrom(result);
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.getDescriptor();
}
public se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO getDefaultInstanceForType() {
return se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.getDefaultInstance();
}
public boolean isInitialized() {
return result.isInitialized();
}
public se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO build() {
if (result != null && !isInitialized()) {
throw newUninitializedMessageException(result);
}
return buildPartial();
}
private se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
if (!isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return buildPartial();
}
public se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO buildPartial() {
if (result == null) {
throw new IllegalStateException(
"build() has already been called on this Builder.");
}
se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO returnMe = result;
result = null;
return returnMe;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO) {
return mergeFrom((se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO other) {
if (other == se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.getDefaultInstance()) return this;
if (other.hasId()) {
setId(other.getId());
}
if (other.hasName()) {
setName(other.getName());
}
if (other.hasStatus()) {
setStatus(other.getStatus());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
return this;
}
break;
}
case 8: {
setId(input.readUInt64());
break;
}
case 18: {
setName(input.readString());
break;
}
case 24: {
setStatus(input.readBool());
break;
}
}
}
}
// required uint64 id = 1;
public boolean hasId() {
return result.hasId();
}
public long getId() {
return result.getId();
}
public Builder setId(long value) {
result.hasId = true;
result.id_ = value;
return this;
}
public Builder clearId() {
result.hasId = false;
result.id_ = 0L;
return this;
}
// required string name = 2;
public boolean hasName() {
return result.hasName();
}
public java.lang.String getName() {
return result.getName();
}
public Builder setName(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
result.hasName = true;
result.name_ = value;
return this;
}
public Builder clearName() {
result.hasName = false;
result.name_ = getDefaultInstance().getName();
return this;
}
// required bool status = 3;
public boolean hasStatus() {
return result.hasStatus();
}
public boolean getStatus() {
return result.getStatus();
}
public Builder setStatus(boolean value) {
result.hasStatus = true;
result.status_ = value;
return this;
}
public Builder clearStatus() {
result.hasStatus = false;
result.status_ = false;
return this;
}
}
static {
se.scalablesolutions.akka.api.ProtobufProtocol.getDescriptor();
}
static {
se.scalablesolutions.akka.api.ProtobufProtocol.internalForceInit();
}
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
java.lang.String[] descriptorData = {
"\n4se/scalablesolutions/akka/api/Protobuf" +
"Protocol.proto\022\035se.scalablesolutions.akk" +
"a.api\"8\n\014ProtobufPOJO\022\n\n\002id\030\001 \002(\004\022\014\n\004nam" +
"e\030\002 \002(\t\022\016\n\006status\030\003 \002(\010"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor,
new java.lang.String[] { "Id", "Name", "Status", },
se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.class,
se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.Builder.class);
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
}, assigner);
}
public static void internalForceInit() {}
}

View file

@ -1,17 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.api;
/*
Compile with:
cd ./akka-fun-test-java/src/test/java
protoc se/scalablesolutions/akka/api/ProtobufProtocol.proto --java_out .
*/
message ProtobufPOJO {
required uint64 id = 1;
required string name = 2;
required bool status = 3;
}

View file

@ -1,38 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.api;
import junit.framework.TestCase;
import se.scalablesolutions.akka.serialization.SerializerFactory;
public class ProtobufSerializationTest extends TestCase {
public void testOutIn() throws Exception {
SerializerFactory factory = new SerializerFactory();
ProtobufProtocol.ProtobufPOJO pojo1 = ProtobufProtocol.ProtobufPOJO.getDefaultInstance().toBuilder().setId(1).setName("protobuf").setStatus(true).build();
byte[] bytes = factory.getProtobuf().out(pojo1);
Object obj = factory.getProtobuf().in(bytes, pojo1.getClass());
assertTrue(obj instanceof ProtobufProtocol.ProtobufPOJO);
ProtobufProtocol.ProtobufPOJO pojo2 = (ProtobufProtocol.ProtobufPOJO)obj;
assertEquals(pojo1.getId(), pojo2.getId());
assertEquals(pojo1.getName(), pojo2.getName());
assertEquals(pojo1.getStatus(), pojo2.getStatus());
}
public void testDeepClone() throws Exception {
SerializerFactory factory = new SerializerFactory();
ProtobufProtocol.ProtobufPOJO pojo1 = ProtobufProtocol.ProtobufPOJO.getDefaultInstance().toBuilder().setId(1).setName("protobuf").setStatus(true).build();
Object obj = factory.getProtobuf().deepClone(pojo1);
assertTrue(obj instanceof ProtobufProtocol.ProtobufPOJO);
ProtobufProtocol.ProtobufPOJO pojo2 = (ProtobufProtocol.ProtobufPOJO)obj;
assertEquals(pojo1.getId(), pojo2.getId());
assertEquals(pojo1.getName(), pojo2.getName());
assertEquals(pojo1.getStatus(), pojo2.getStatus());
}
}

View file

@ -1,88 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.config.*;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import junit.framework.TestCase;
public class RemotePersistentStateTest extends TestCase {
static String messageLog = "";
final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
protected void setUp() {
PersistenceManager.init();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[] {
new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 1000000, new RemoteAddress("localhost", 9999)),
new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000000, new RemoteAddress("localhost", 9999))
}).supervise();
}
protected void tearDown() {
conf.stop();
}
public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "MapShouldRollBack", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
}
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
int init = stateful.getVectorLength();
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "VectorShouldNotRollback"); // transactionrequired
assertEquals(init + 1, stateful.getVectorLength());
}
public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
int init = stateful.getVectorLength();
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals(init, stateful.getVectorLength());
}
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getRefState());
}
public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setRefState("init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getRefState()); // check that state is == init state
}
}

View file

@ -1,16 +0,0 @@
<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
<suite name="Tests for Akka Java API module" verbose="1">
<test name="core">
<groups>
<run>
<include name="functional"/>
<include name="unit"/>
<exclude name="broken"/>
</run>
</groups>
<packages>
<package name="com.scalablesolutions.akka.api.*" />
</packages>
</test>
</suite>

View file

@ -47,29 +47,29 @@ class AkkaLoader extends Logging {
private def printBanner = {
log.info(
"""
t
t t t
t t tt t
tt t t tt t
t ttttttt t ttt t
t tt ttt t ttt t
t t ttt t ttt t t
tt t ttt ttt ttt t
t t ttt ttt t tt t
t ttt ttt t t
tt ttt ttt t
ttt ttt
tttttttt ttt ttt ttt ttt tttttttt
ttt tt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt tt ttt ttt
tttt ttttttttt tttttttt tttt
ttttttttt ttt ttt ttt ttt ttttttttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt tt ttt ttt ttt ttt ttt ttt
t
t t t
t t tt t
tt t t tt t
t ttttttt t ttt t
t tt ttt t ttt t
t t ttt t ttt t t
tt t ttt ttt ttt t
t t ttt ttt t tt t
t ttt ttt t t
tt ttt ttt t
ttt ttt
tttttttt ttt ttt ttt ttt tttttttt
ttt tt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt tt ttt ttt
tttt ttttttttt tttttttt tttt
ttttttttt ttt ttt ttt ttt ttttttttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt tt ttt ttt ttt ttt ttt ttt
tttttttt ttt ttt ttt ttt tttttttt
==================================================
""")
log.info(" Running version %s", Config.VERSION)

View file

@ -50,14 +50,14 @@ trait EmbeddedAppServer extends Bootable with Logging {
Thread.currentThread.setContextClassLoader(applicationLoader.get)
super.init(sc)
}
finally {
finally {
Thread.currentThread.setContextClassLoader(cl)
}
}
})
adapter.setContextPath(uri.getPath)
adapter.addInitParameter("cometSupport",
adapter.addInitParameter("cometSupport",
"org.atmosphere.container.GrizzlyCometSupport")
adapter.addInitParameter("com.sun.jersey.config.property.resourceConfigClass",
"com.sun.jersey.api.core.PackagesResourceConfig")
@ -65,7 +65,7 @@ trait EmbeddedAppServer extends Bootable with Logging {
config.getList("akka.rest.resource_packages").mkString(";")
)
adapter.addInitParameter("com.sun.jersey.spi.container.ResourceFilters",
config.getList("akka.rest.filters").mkString(",")
config.getList("akka.rest.filters").mkString(",")
)
if (HOME.isDefined) adapter.addRootFolder(HOME.get + "/deploy/root")

View file

@ -42,7 +42,7 @@ private[akka] object CassandraStorageBackend extends
case "ALL" => ConsistencyLevel.ALL
case "ANY" => ConsistencyLevel.ANY
case unknown => throw new IllegalArgumentException(
"Cassandra consistency level [" + unknown + "] is not supported." +
"Cassandra consistency level [" + unknown + "] is not supported." +
"\n\tExpected one of [ZERO, ONE, QUORUM, DCQUORUM, DCQUORUMSYNC, ALL, ANY] in the akka.conf configuration file.")
}
}
@ -105,9 +105,9 @@ private[akka] object CassandraStorageBackend extends
}
}
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) =
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) =
elements.foreach(insertVectorStorageEntryFor(name, _))
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
val columnPath = new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family)
columnPath.setColumn(intToBytes(index))

View file

@ -171,4 +171,4 @@ object EmbeddedCassandraService {
def start: Unit = {}
}
*/
*/

View file

@ -30,7 +30,7 @@ class StorageException(message: String) extends RuntimeException(message)
* <pre>
* val myMap = CassandraStorage.getMap(id)
* </pre>
*
*
* Example Java usage:
* <pre>
* PersistentMap<Object, Object> myMap = MongoStorage.newMap();
@ -72,7 +72,7 @@ trait Storage {
}
/**
* Implementation of <tt>PersistentMap</tt> for every concrete
* Implementation of <tt>PersistentMap</tt> for every concrete
* storage will have the same workflow. This abstracts the workflow.
*
* Subclasses just need to provide the actual concrete instance for the
@ -117,23 +117,23 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
put(key, value)
this
}
override def put(key: K, value: V): Option[V] = {
register
newAndUpdatedEntries.put(key, value)
}
override def update(key: K, value: V) = {
override def update(key: K, value: V) = {
register
newAndUpdatedEntries.update(key, value)
}
override def remove(key: K) = {
register
removedEntries.add(key)
newAndUpdatedEntries.get(key)
}
def slice(start: Option[K], count: Int): List[Tuple2[K, V]] =
slice(start, None, count)
@ -141,11 +141,11 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
storage.getMapStorageRangeFor(uuid, start, finish, count)
} catch { case e: Exception => Nil }
override def clear = {
override def clear = {
register
shouldClearOnCommit.swap(true)
}
override def contains(key: K): Boolean = try {
newAndUpdatedEntries.contains(key) ||
storage.getMapStorageEntryFor(uuid, key).isDefined
@ -163,9 +163,9 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
storage.getMapStorageEntryFor(uuid, key)
} catch { case e: Exception => None }
}
def iterator = elements
override def elements: Iterator[Tuple2[K, V]] = {
new Iterator[Tuple2[K, V]] {
private val originalList: List[Tuple2[K, V]] = try {
@ -173,10 +173,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
} catch {
case e: Throwable => Nil
}
private var elements = newAndUpdatedEntries.toList union originalList.reverse
private var elements = newAndUpdatedEntries.toList union originalList.reverse
override def next: Tuple2[K, V]= synchronized {
val element = elements.head
elements = elements.tail
elements = elements.tail
element
}
override def hasNext: Boolean = synchronized { !elements.isEmpty }
@ -217,12 +217,12 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
}
def +(elem: T) = add(elem)
def add(elem: T) = {
register
newElems + elem
}
def apply(index: Int): T = get(index)
def get(index: Int): T = {
@ -231,7 +231,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
}
override def slice(start: Int, finish: Int): IndexedSeq[T] = slice(Some(start), Some(finish))
def slice(start: Option[Int], finish: Option[Int], count: Int = 0): IndexedSeq[T] = {
val buffer = new scala.collection.mutable.ArrayBuffer[T]
storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_))
@ -277,21 +277,21 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
*/
trait PersistentRef[T] extends Transactional with Committable with Abortable {
protected val ref = new TransactionalRef[T]
val storage: RefStorageBackend[T]
def commit = if (ref.isDefined) {
storage.insertRefStorageFor(uuid, ref.get.get)
ref.swap(null.asInstanceOf[T])
ref.swap(null.asInstanceOf[T])
}
def abort = ref.swap(null.asInstanceOf[T])
def abort = ref.swap(null.asInstanceOf[T])
def swap(elem: T) = {
register
ref.swap(elem)
}
def get: Option[T] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid)
def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined
@ -309,7 +309,7 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable {
}
/**
* Implementation of <tt>PersistentQueue</tt> for every concrete
* Implementation of <tt>PersistentQueue</tt> for every concrete
* storage will have the same workflow. This abstracts the workflow.
* <p/>
* Enqueue is simpler, we just have to record the operation in a local
@ -410,13 +410,13 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
}
}
override def clear = {
override def clear = {
register
shouldClearOnCommit.swap(true)
localQ.swap(Queue.empty)
pickMeForDQ.swap(0)
}
override def size: Int = try {
storage.size(uuid) + localQ.get.get.length
} catch { case e: Exception => 0 }
@ -424,11 +424,11 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
override def isEmpty: Boolean =
size == 0
override def +=(elem: A) = {
override def +=(elem: A) = {
enqueue(elem)
this
}
def ++=(elems: Iterator[A]) = {
def ++=(elems: Iterator[A]) = {
enqueue(elems.toList: _*)
this
}
@ -450,7 +450,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
* Implements a template for a concrete persistent transactional sorted set based storage.
* <p/>
* Sorting is done based on a <i>zscore</i>. But the computation of zscore has been kept
* outside the abstraction.
* outside the abstraction.
* <p/>
* zscore can be implemented in a variety of ways by the calling class:
* <pre>
@ -467,7 +467,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
* class Foo {
* //..
* }
*
*
* implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
* def toZScore = {
* //..
@ -526,7 +526,7 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
}
}
}
def size: Int = newElems.size + storage.zcard(uuid) - removedElems.size
def zscore(elem: A): Float = {
@ -541,9 +541,9 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
implicit def order(x: (A, Float)) = new Ordered[(A, Float)] {
def compare(that: (A, Float)) = x._2 compare that._2
}
implicit def ordering = new scala.math.Ordering[(A,Float)] {
def compare(x: (A, Float),y : (A,Float)) = x._2 compare y._2
def compare(x: (A, Float),y : (A,Float)) = x._2 compare y._2
}
@ -556,7 +556,7 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
// -1 means the last element, -2 means the second last
val s = if (start < 0) start + l else start
val e =
val e =
if (end < 0) end + l
else if (end >= l) (l - 1)
else end

View file

@ -26,7 +26,7 @@ trait VectorStorageBackend[T] extends StorageBackend {
def updateVectorStorageEntryFor(name: String, index: Int, elem: T)
def getVectorStorageEntryFor(name: String, index: Int): T
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[T]
def getVectorStorageSizeFor(name: String): Int
def getVectorStorageSizeFor(name: String): Int
}
// for Ref
@ -47,17 +47,17 @@ trait RefStorageBackend[T] extends StorageBackend {
trait QueueStorageBackend[T] extends StorageBackend {
// add to the end of the queue
def enqueue(name: String, item: T): Boolean
// pop from the front of the queue
def dequeue(name: String): Option[T]
// get the size of the queue
def size(name: String): Int
// return an array of items currently stored in the queue
// start is the item to begin, count is how many items to return
def peek(name: String, start: Int, count: Int): List[T]
// completely delete the queue
def remove(name: String): Boolean
}
@ -65,19 +65,19 @@ trait QueueStorageBackend[T] extends StorageBackend {
trait SortedSetStorageBackend[T] extends StorageBackend {
// add item to sorted set identified by name
def zadd(name: String, zscore: String, item: T): Boolean
// remove item from sorted set identified by name
def zrem(name: String, item: T): Boolean
// cardinality of the set identified by name
def zcard(name: String): Int
// zscore of the item from sorted set identified by name
def zscore(name: String, item: T): Option[Float]
// zrange from the sorted set identified by name
def zrange(name: String, start: Int, end: Int): List[T]
// zrange with score from the sorted set identified by name
def zrangeWithScore(name: String, start: Int, end: Int): List[(T, Float)]
def zrangeWithScore(name: String, start: Int, end: Int): List[(T, Float)]
}

View file

@ -280,7 +280,7 @@ private[akka] object MongoStorageBackend extends
}
val currentList = dbobj.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
currentList.set(index, serializer.out(elem))
coll.update(q,
coll.update(q,
new BasicDBObject().append(KEY, name).append(VALUE, currentList))
}

View file

@ -87,7 +87,7 @@ object World {
pingEvery(evaporator, EvapMillis)
}
private def pingEvery(actor: ActorRef, millis: Long) =
private def pingEvery(actor: ActorRef, millis: Long) =
Scheduler.schedule(actor, "ping", Config.StartDelay, millis, TimeUnit.MILLISECONDS)
}

View file

@ -26,4 +26,4 @@ object Application1 {
println(actor2 !! Message("actor2"))
}
}
}

View file

@ -19,4 +19,4 @@ object Application2 {
RemoteNode.start("localhost", 7777)
RemoteNode.register("remote2", actorOf[RemoteActor2].start)
}
}
}

View file

@ -74,4 +74,4 @@ class CustomRouteBuilder extends RouteBuilder {
}
})
}
}
}

View file

@ -18,8 +18,8 @@ Then to run the sample:
- Run 'sbt console' to start up a REPL (interpreter).
4. In the first REPL you get execute:
- scala> import sample.chat._
- scala> import se.scalablesolutions.akka.actor.Actor._
- scala> val chatService = actorOf[ChatService].start
- scala> import se.scalablesolutions.akka.actor.Actor
- scala> val chatService = Actor.actorOf[ChatService].start
5. In the second REPL you get execute:
- scala> import sample.chat._
- scala> Runner.run

View file

@ -27,17 +27,17 @@ First we need to download, build and start up Redis:
4. Run: ./redis-server.
For details on how to set up Redis server have a look at http://code.google.com/p/redis/wiki/QuickStart.
Then to run the sample:
Then to run the sample:
1. Fire up two shells. For each of them:
- Step down into to the root of the Akka distribution.
- Set 'export AKKA_HOME=<root of distribution>.
- Run 'sbt console' to start up a REPL (interpreter).
2. In the first REPL you get execute:
2. In the first REPL you get execute:
- scala> import sample.chat._
- scala> import se.scalablesolutions.akka.actor.Actor._
- scala> val chatService = actorOf[ChatService].start
3. In the second REPL you get execute:
3. In the second REPL you get execute:
- scala> import sample.chat._
- scala> Runner.run
4. See the chat simulation run.
@ -60,12 +60,12 @@ case class ChatMessage(from: String, message: String) extends Event
/**
* Chat client.
*/
class ChatClient(val name: String) {
class ChatClient(val name: String) {
val chat = RemoteClient.actorFor("chat:service", "localhost", 9999)
def login = chat ! Login(name)
def logout = chat ! Logout(name)
def post(message: String) = chat ! ChatMessage(name, name + ": " + message)
def login = chat ! Login(name)
def logout = chat ! Logout(name)
def post(message: String) = chat ! ChatMessage(name, name + ": " + message)
def chatLog: ChatLog = (chat !! GetChatLog(name)).getOrElse(throw new Exception("Couldn't get the chat log from ChatServer"))
}
@ -75,15 +75,15 @@ class ChatClient(val name: String) {
class Session(user: String, storage: ActorRef) extends Actor {
private val loginTime = System.currentTimeMillis
private var userLog: List[String] = Nil
log.info("New session for user [%s] has been created at [%s]", user, loginTime)
def receive = {
case msg @ ChatMessage(from, message) =>
case msg @ ChatMessage(from, message) =>
userLog ::= message
storage ! msg
case msg @ GetChatLog(_) =>
case msg @ GetChatLog(_) =>
storage forward msg
}
}
@ -97,24 +97,24 @@ trait ChatStorage extends Actor
* Redis-backed chat storage implementation.
*/
class RedisChatStorage extends ChatStorage {
self.lifeCycle = Some(LifeCycle(Permanent))
self.lifeCycle = Some(LifeCycle(Permanent))
val CHAT_LOG = "akka.chat.log"
private var chatLog = atomic { RedisStorage.getVector(CHAT_LOG) }
log.info("Redis-based chat storage is starting up...")
def receive = {
case msg @ ChatMessage(from, message) =>
case msg @ ChatMessage(from, message) =>
log.debug("New chat message [%s]", message)
atomic { chatLog + message.getBytes("UTF-8") }
case GetChatLog(_) =>
case GetChatLog(_) =>
val messageList = atomic { chatLog.map(bytes => new String(bytes, "UTF-8")).toList }
self.reply(ChatLog(messageList))
}
override def postRestart(reason: Throwable) = chatLog = RedisStorage.getVector(CHAT_LOG)
override def postRestart(reason: Throwable) = chatLog = RedisStorage.getVector(CHAT_LOG)
}
/**
@ -122,27 +122,27 @@ class RedisChatStorage extends ChatStorage {
* <p/>
* Uses self-type annotation (this: Actor =>) to declare that it needs to be mixed in with an Actor.
*/
trait SessionManagement { this: Actor =>
trait SessionManagement { this: Actor =>
val storage: ActorRef // needs someone to provide the ChatStorage
val sessions = new HashMap[String, ActorRef]
protected def sessionManagement: Receive = {
case Login(username) =>
case Login(username) =>
log.info("User [%s] has logged in", username)
val session = actorOf(new Session(username, storage))
session.start
sessions += (username -> session)
case Logout(username) =>
case Logout(username) =>
log.info("User [%s] has logged out", username)
val session = sessions(username)
session.stop
sessions -= username
}
protected def shutdownSessions =
sessions.foreach { case (_, session) => session.stop }
sessions -= username
}
protected def shutdownSessions =
sessions.foreach { case (_, session) => session.stop }
}
/**
@ -152,7 +152,7 @@ trait SessionManagement { this: Actor =>
*/
trait ChatManagement { this: Actor =>
val sessions: HashMap[String, ActorRef] // needs someone to provide the Session map
protected def chatManagement: Receive = {
case msg @ ChatMessage(from, _) => sessions(from) ! msg
case msg @ GetChatLog(from) => sessions(from) forward msg
@ -172,20 +172,20 @@ trait RedisChatStorageFactory { this: Actor =>
trait ChatServer extends Actor {
self.faultHandler = Some(OneForOneStrategy(5, 5000))
self.trapExit = List(classOf[Exception])
val storage: ActorRef
log.info("Chat server is starting up...")
// actor message handler
def receive = sessionManagement orElse chatManagement
// abstract methods to be defined somewhere else
protected def chatManagement: Receive
protected def sessionManagement: Receive
protected def sessionManagement: Receive
protected def shutdownSessions: Unit
override def shutdown = {
override def shutdown = {
log.info("Chat server is shutting down...")
shutdownSessions
self.unlink(storage)
@ -200,10 +200,10 @@ trait ChatServer extends Actor {
* val chatService = Actor.actorOf[ChatService].start
* </pre>
*/
class ChatService extends
ChatServer with
SessionManagement with
ChatManagement with
class ChatService extends
ChatServer with
SessionManagement with
ChatManagement with
RedisChatStorageFactory {
override def init = {
RemoteNode.start("localhost", 9999)
@ -217,7 +217,7 @@ class ChatService extends
object Runner {
def run = {
val client = new ChatClient("jonas")
client.login
client.post("Hi there")
@ -228,4 +228,4 @@ object Runner {
client.logout
}
}
}

View file

@ -23,7 +23,7 @@ class Boot extends Logging {
def boot {
// where to search snippet
LiftRules.addToPackages("sample.lift")
LiftRules.httpAuthProtectedResource.prepend {
case (Req("liftcount" :: Nil, _, _)) => Full(AuthRole("admin"))
}
@ -35,9 +35,9 @@ class Boot extends Logging {
true
}
}
LiftRules.passNotFoundToChain = true
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
@ -49,7 +49,7 @@ class Boot extends Logging {
LifeCycle(Permanent)) ::
Nil))
factory.newInstance.start
// Build SiteMap
// val entries = Menu(Loc("Home", List("index"), "Home")) :: Nil
// LiftRules.setSiteMap(SiteMap(entries:_*))

View file

@ -13,4 +13,4 @@ object LiftConsole {
exit(0)
}
}
*/
*/

View file

@ -10,7 +10,7 @@ import se.scalablesolutions.akka.actor.Actor._
/**
* Sample Akka application for Redis PubSub
*
*
* Prerequisite: Need Redis Server running (the version that supports pubsub)
* <pre>
* 1. Download redis from http://github.com/antirez/redis
@ -65,7 +65,7 @@ object Sub {
val r = new RedisClient("localhost", 6379)
val s = actorOf(new Subscriber(r))
s.start
s ! Register(callback)
s ! Register(callback)
def sub(channels: String*) = {
s ! Subscribe(channels.toArray)
@ -78,29 +78,29 @@ object Sub {
def callback(pubsub: PubSubMessage) = pubsub match {
case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>
case "exit" =>
println("unsubscribe all ..")
r.unsubscribe
// message "+x" will subscribe to channel x
case x if x startsWith "+" =>
case x if x startsWith "+" =>
val s: Seq[Char] = x
s match {
case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
}
// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>
case x if x startsWith "-" =>
val s: Seq[Char] = x
s match {
case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
}
// other message receive
case x =>
case x =>
println("received message on channel " + channel + " as : " + x)
}
}

View file

@ -11,7 +11,7 @@ import se.scalablesolutions.akka.util.Logging
class RemoteHelloWorldActor extends RemoteActor("localhost", 9999) {
def receive = {
case "Hello" =>
case "Hello" =>
log.info("Received 'Hello'")
self.reply("World")
}
@ -27,7 +27,7 @@ object ClientManagedRemoteActorServer extends Logging {
}
object ClientManagedRemoteActorClient extends Logging {
def run = {
val actor = actorOf[RemoteHelloWorldActor].start
log.info("Remote actor created, moved to the server")

View file

@ -11,7 +11,7 @@ import se.scalablesolutions.akka.util.Logging
class HelloWorldActor extends Actor {
def receive = {
case "Hello" =>
case "Hello" =>
log.info("Received 'Hello'")
self.reply("World")
}
@ -30,7 +30,7 @@ object ServerManagedRemoteActorServer extends Logging {
}
object ServerManagedRemoteActorClient extends Logging {
def run = {
val actor = RemoteClient.actorFor("hello-service", "localhost", 9999)
log.info("Remote client created")

View file

@ -1,13 +1,16 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package sample.rest.java;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
public class Boot {
final private ActiveObjectConfigurator manager = new ActiveObjectConfigurator();
public Boot() throws Exception {
manager.configure(
public final static ActiveObjectConfigurator configurator = new ActiveObjectConfigurator();
static {
configurator.configure(
new RestartStrategy(new OneForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[] {
new Component(
@ -19,5 +22,5 @@ public class Boot {
new LifeCycle(new Permanent()),
1000)
}).supervise();
}
}
}

View file

@ -4,10 +4,6 @@
package sample.rest.java;
import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.actor.annotation.prerestart;
import se.scalablesolutions.akka.actor.annotation.postrestart;
@ -16,14 +12,6 @@ import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage;
import java.nio.ByteBuffer;
/**
* Try service out by invoking (multiple times):
* <pre>
* curl http://localhost:9998/persistentjavacount
* </pre>
* Or browse to the URL from a web browser.
*/
@Path("/persistentjavacount")
@transactionrequired
public class PersistentSimpleService {
private String KEY = "COUNTER";
@ -31,8 +19,6 @@ public class PersistentSimpleService {
private boolean hasStartedTicking = false;
private PersistentMap<byte[], byte[]> storage;
@GET
@Produces({"application/html"})
public String count() {
if (storage == null) storage = CassandraStorage.newMap();
if (!hasStartedTicking) {

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package sample.rest.java;
import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
/**
* Try service out by invoking (multiple times):
* <pre>
* curl http://localhost:9998/persistentjavacount
* </pre>
* Or browse to the URL from a web browser.
*/
@Path("/persistentjavacount")
public class PersistentSimpleServiceRest {
private PersistentSimpleService service = (PersistentSimpleService) Boot.configurator.getInstance(PersistentSimpleService.class);
@GET
@Produces({"application/json"})
public String count() {
return service.count();
}
}

View file

@ -4,10 +4,6 @@
package sample.rest.java;
import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
import se.scalablesolutions.akka.actor.ActiveObject;
import se.scalablesolutions.akka.actor.ActiveObjectContext;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
@ -16,14 +12,6 @@ import se.scalablesolutions.akka.actor.annotation.postrestart;
import se.scalablesolutions.akka.stm.TransactionalState;
import se.scalablesolutions.akka.stm.TransactionalMap;
/**
* Try service out by invoking (multiple times):
* <pre>
* curl http://localhost:9998/javacount
* </pre>
* Or browse to the URL from a web browser.
*/
@Path("/javacount")
@transactionrequired
public class SimpleService {
private String KEY = "COUNTER";
@ -32,8 +20,6 @@ public class SimpleService {
private TransactionalMap<String, Integer> storage;
private Receiver receiver = ActiveObject.newInstance(Receiver.class);
@GET
@Produces({"application/json"})
public String count() {
if (storage == null) storage = TransactionalState.newMap();
if (!hasStartedTicking) {

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package sample.rest.java;
import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
/**
* Try service out by invoking (multiple times):
* <pre>
* curl http://localhost:9998/javacount
* </pre>
* Or browse to the URL from a web browser.
*/
@Path("/javacount")
public class SimpleServiceRest {
private SimpleService service = (SimpleService) Boot.configurator.getInstance(SimpleService.class);
@GET
@Produces({"application/json"})
public String count() {
return service.count();
}
}

View file

@ -51,14 +51,15 @@ class SimpleService {
@GET
@Produces(Array("text/html"))
def count = {
//Fetch the first actor of type SimpleServiceActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption
r <- a.!![NodeSeq]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result getOrElse <error>Error in counter</error>
//Fetch the first actor of type SimpleServiceActor
//Send it the "Tick" message and expect a NodeSeq back
val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption
r <- a.!![NodeSeq]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result getOrElse <error>Error in counter</error>
}
}
class SimpleServiceActor extends Transactor {
private val KEY = "COUNTER"
private var hasStartedTicking = false
@ -105,12 +106,12 @@ class PersistentSimpleService {
@GET
@Produces(Array("text/html"))
def count = {
//Fetch the first actor of type PersistentSimpleServiceActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption
r <- a.!![NodeSeq]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result getOrElse <error>Error in counter</error>
//Fetch the first actor of type PersistentSimpleServiceActor
//Send it the "Tick" message and expect a NodeSeq back
val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption
r <- a.!![NodeSeq]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result getOrElse <error>Error in counter</error>
}
}
@ -147,18 +148,18 @@ class Chat {
@Consumes(Array("application/x-www-form-urlencoded"))
@Produces(Array("text/html"))
def publishMessage(form: MultivaluedMap[String, String]) = {
val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message"))
val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message"))
//Fetch the first actor of type ChatActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[ChatActor]).headOption
r <- a.!![String](msg)} yield r
//Return either the resulting String or a default one
result getOrElse "System__error"
//Send it the "Tick" message and expect a NodeSeq back
val result = for{a <- actorsFor(classOf[ChatActor]).headOption
r <- a.!![String](msg)} yield r
//Return either the resulting String or a default one
result getOrElse "System__error"
}
}
object ChatActor {
case class ChatMsg(val who: String, val what: String, val msg: String)
case class ChatMsg(val who: String, val what: String, val msg: String)
}
class ChatActor extends Actor with Logging {

View file

@ -120,12 +120,12 @@ class SecureTickService {
def paranoiaTick = tick
def tick = {
//Fetch the first actor of type PersistentSimpleServiceActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[SecureTickActor]).headOption
r <- a.!![Integer]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result match {
//Fetch the first actor of type PersistentSimpleServiceActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[SecureTickActor]).headOption
r <- a.!![Integer]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result match {
case (Some(counter)) => (<success>Tick: {counter}</success>)
case _ => (<error>Error in counter</error>)
}
@ -147,4 +147,4 @@ class SecureTickActor extends Transactor with Logging {
self.reply(new Integer(0))
}
}
}
}

View file

@ -1,10 +1,10 @@
####################
# Akka Config File #
####################
# This file has all the default settings, so all these could be removed with no visible effect.
# Modify as needed.
<log>
filename = "./logs/akka.log"
roll = "daily" # Options: never, hourly, daily, sunday/monday/...
@ -13,25 +13,22 @@
# syslog_host = ""
# syslog_server_name = ""
</log>
<akka>
version = "0.9"
# FQN (Fully Qualified Name) to the class doing initial active object/actor
# supervisor bootstrap, should be defined in default constructor
boot = ["sample.camel.Boot",
"sample.rest.java.Boot",
"sample.rest.java.Boot",
"sample.rest.scala.Boot",
"sample.security.Boot"]
<actor>
timeout = 5000 # default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
</actor>
<dispatcher>
timeout = 5000 # default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # default throughput for ExecutorBasedEventDrivenDispatcher
</dispatcher>
</actor>
<stm>
service = on
@ -41,10 +38,10 @@
jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will
# begin (or join), commit or rollback the JTA transaction. Default is 'off'.
</stm>
<jta>
provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI)
# "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta',
provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI)
# "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta',
# e.g. you need the akka-jta JARs on classpath).
timeout = 60000
</jta>
@ -56,7 +53,7 @@
filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
resource_packages = ["sample.rest.scala","sample.rest.java","sample.security"] # List with all resource packages for your Jersey services
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now)
#IF you are using a KerberosAuthenticationActor
# <kerberos>
# servicePrincipal = "HTTP/localhost@EXAMPLE.COM"
@ -75,8 +72,8 @@
name = "default" # The name of the cluster
serializer = "se.scalablesolutions.akka.serialization.Serializer$Java$" # FQN of the serializer class
</cluster>
<server>
<server>
service = on
hostname = "localhost"
port = 9999
@ -88,14 +85,14 @@
read-timeout = 10000 # in millis (10 sec default)
</client>
</remote>
<storage>
<cassandra>
hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds
port = 9160
consistency-level = "QUORUM" # Options: ZERO, ONE, QUORUM, DCQUORUM, DCQUORUMSYNC, ALL, ANY
</cassandra>
<mongodb>
hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance
port = 27017

View file

@ -1,20 +1,20 @@
# for production, you should probably set the root to INFO
# and the pattern to %c instead of %l. (%l is slower.)
# output messages into a rolling log file as well as stdout
log4j.rootLogger=INFO,stdout,R
# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.SimpleLayout
# rolling log file ("system.log
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
# Edit the next line to point to your logs directory
log4j.appender.R.File=./logs/akka.log
log4j.logger.org.atmosphere=DEBUG
# for production, you should probably set the root to INFO
# and the pattern to %c instead of %l. (%l is slower.)
# output messages into a rolling log file as well as stdout
log4j.rootLogger=INFO,stdout,R
# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.SimpleLayout
# rolling log file ("system.log
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
# Edit the next line to point to your logs directory
log4j.appender.R.File=./logs/akka.log
log4j.logger.org.atmosphere=DEBUG

View file

@ -21,15 +21,15 @@
<!-- Basic Configuration -->
<!--======================================================================-->
<!--
<!--
~ The name of this cluster. This is mainly used to prevent machines in
~ one logical cluster from joining another.
-->
<ClusterName>akka</ClusterName>
<!--
~ Turn on to make new [non-seed] nodes automatically migrate the right data
~ to themselves. (If no InitialToken is specified, they will pick one
~ Turn on to make new [non-seed] nodes automatically migrate the right data
~ to themselves. (If no InitialToken is specified, they will pick one
~ such that they will get half the range of the most-loaded node.)
~ If a node starts up without bootstrapping, it will mark itself bootstrapped
~ so that you can't subsequently accidently bootstrap a node with
@ -66,11 +66,11 @@
~ and LongType. You can also specify the fully-qualified class
~ name to a class of your choice extending
~ org.apache.cassandra.db.marshal.AbstractType.
~
~
~ SuperColumns have a similar CompareSubcolumnsWith attribute.
~
~
~ BytesType: Simple sort by byte value. No validation is performed.
~ AsciiType: Like BytesType, but validates that the input can be
~ AsciiType: Like BytesType, but validates that the input can be
~ parsed as US-ASCII.
~ UTF8Type: A string encoded as UTF8
~ LongType: A 64bit long
@ -82,7 +82,7 @@
~
~ An optional `Comment` attribute may be used to attach additional
~ human-readable information about the column family to its definition.
~
~
~ The optional KeysCached attribute specifies
~ the number of keys per sstable whose locations we keep in
~ memory in "mostly LRU" order. (JUST the key locations, NOT any
@ -94,25 +94,25 @@
~ whose entire contents we cache in memory. Do not use this on
~ ColumnFamilies with large rows, or ColumnFamilies with high write:read
~ ratios. Specify a fraction (value less than 1), a percentage (ending in
~ a % sign) or an absolute number of rows to cache.
~ a % sign) or an absolute number of rows to cache.
~ RowsCached defaults to 0, i.e., row cache is off by default.
~
~ Remember, when using caches as a percentage, they WILL grow with
~ your data set!
-->
<ColumnFamily Name="map"
CompareWith="UTF8Type"
<ColumnFamily Name="map"
CompareWith="UTF8Type"
KeysCached="100%" />
<!-- FIXME: change vector to a super column -->
<ColumnFamily Name="vector"
CompareWith="UTF8Type"
<ColumnFamily Name="vector"
CompareWith="UTF8Type"
KeysCached="100%" />
<ColumnFamily Name="ref"
CompareWith="UTF8Type"
<ColumnFamily Name="ref"
CompareWith="UTF8Type"
KeysCached="100%" />
<!--ColumnFamily Name="Standard1" CompareWith="BytesType"/>
<ColumnFamily Name="Standard2"
<ColumnFamily Name="Standard2"
CompareWith="UTF8Type"
KeysCached="100%"/>
<ColumnFamily Name="StandardByUUID1" CompareWith="TimeUUIDType" />
@ -150,7 +150,7 @@
~ and PropertyFileEndPointSnitch is available in contrib/.
-->
<EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
</Keyspace>
</Keyspaces>
@ -158,7 +158,7 @@
~ Authenticator: any IAuthenticator may be used, including your own as long
~ as it is on the classpath. Out of the box, Cassandra provides
~ org.apache.cassandra.auth.AllowAllAuthenticator and,
~ org.apache.cassandra.auth.SimpleAuthenticator
~ org.apache.cassandra.auth.SimpleAuthenticator
~ (SimpleAuthenticator uses access.properties and passwd.properties by
~ default).
~
@ -188,7 +188,7 @@
~ are sent to the node with the "closest" token, so distributing your
~ tokens equally along the key distribution space will spread keys
~ evenly across your cluster.) This setting is only checked the first
~ time a node is started.
~ time a node is started.
~ This can also be useful with RandomPartitioner to force equal spacing
~ of tokens around the hash space, especially for clusters with a small
@ -227,9 +227,9 @@
<!-- Local hosts and ports -->
<!--
<!--
~ Address to bind to and tell other nodes to connect to. You _must_
~ change this if you want multiple nodes to be able to communicate!
~ change this if you want multiple nodes to be able to communicate!
~
~ Leaving it blank leaves it up to InetAddress.getLocalHost(). This
~ will always do the Right Thing *if* the node is properly configured
@ -251,9 +251,9 @@
<ThriftAddress>localhost</ThriftAddress>
<!-- Thrift RPC port (the port clients connect to). -->
<ThriftPort>9160</ThriftPort>
<!--
<!--
~ Whether or not to use a framed transport for Thrift. If this option
~ is set to true then you must also use a framed transport on the
~ is set to true then you must also use a framed transport on the
~ client-side, (framed and non-framed transports are not compatible).
-->
<ThriftFramedTransport>false</ThriftFramedTransport>
@ -285,16 +285,16 @@
<!--
~ Buffer size to use when performing contiguous column slices. Increase
~ this to the size of the column slices you typically perform.
~ (Name-based queries are performed with a buffer size of
~ this to the size of the column slices you typically perform.
~ (Name-based queries are performed with a buffer size of
~ ColumnIndexSizeInKB.)
-->
<SlicedBufferSizeInKB>64</SlicedBufferSizeInKB>
<!--
~ Buffer size to use when flushing memtables to disk. (Only one
~ Buffer size to use when flushing memtables to disk. (Only one
~ memtable is ever flushed at a time.) Increase (decrease) the index
~ buffer size relative to the data buffer if you have few (many)
~ buffer size relative to the data buffer if you have few (many)
~ columns per key. Bigger is only better _if_ your memtables get large
~ enough to use the space. (Check in your data directory after your
~ app has been running long enough.) -->
@ -314,7 +314,7 @@
<!--
~ Flush memtable after this much data has been inserted, including
~ overwritten data. There is one memtable per column family, and
~ overwritten data. There is one memtable per column family, and
~ this threshold is based solely on the amount of data stored, not
~ actual heap memory usage (there is some overhead in indexing the
~ columns).
@ -379,7 +379,7 @@
~ individually). Reasonable values range from a minimal 0.1 to 10 or
~ even more if throughput matters more than latency.
-->
<!-- <CommitLogSyncBatchWindowInMS>1</CommitLogSyncBatchWindowInMS> -->
<!-- <CommitLogSyncBatchWindowInMS>1</CommitLogSyncBatchWindowInMS> -->
<!--
~ Time to wait before garbage-collection deletion markers. Set this to

File diff suppressed because one or more lines are too long

View file

@ -1,65 +0,0 @@
<html>
<head>
<title>Push Services Demo</title>
<script language="javascript" src="jquery-1.3.2.min.js">
</script>
<style>
.hide {
visibility: hidden;
}
</style>
</head>
<body id="body">
<div id="container">
<div id="chat">
<div id="chatwindow">
</div>
<div>
<input type="text" name="name" id="name"/>
<input type="button" name="login" value="Login" id="login"/>
<textarea id="msg" class="hide" name="message" rows="10" cols="150"></textarea>
<input type="button" name="Send" id="send" value="Send" class="hide"/>
</div>
</div>
</div>
<script language="javascript">
$(function() {
window.app = {
update : function(data) {
if (data && data.name)
$('#chatwindow').append('<p>' + data.name + (data.message ? (': ' + data.message) : '') + '</p>');
else
alert(data);
}
};
$('#send').click(function(e) {
var message = $('#msg').val();
$('#msg').val('');
$.post('/chat',
{
'action' : 'post',
'name' : $('#name').val(),
'message' : message
});
});
$('#login').click(function(e) {
$.post('/chat',
{
'action' : 'login',
'name' : $('#name').val()
},
function(data) {
$('#login').hide();
$('#name').attr('disabled', 'disabled');
$('#msg').removeClass('hide');
$('#send').removeClass('hide');
$('<iframe style="display:hidden;" id="comet" src="/chat"></iframe>').appendTo('#body');
});
});
});
</script>
</body>
</html>

View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-annotations</artifactId>
<version>0.6-20100604</version>
<packaging>jar</packaging>
</project>

View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-compat-jbossweb</artifactId>
<version>0.6-20100604</version>
<packaging>jar</packaging>
</project>

View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-compat-tomcat</artifactId>
<version>0.6-20100604</version>
<packaging>jar</packaging>
</project>

View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-compat-weblogic</artifactId>
<version>0.6-20100604</version>
<packaging>jar</packaging>
</project>

View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-jersey</artifactId>
<version>0.6-20100604</version>
<packaging>jar</packaging>
</project>

View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-runtime</artifactId>
<version>0.6-20100604</version>
<packaging>jar</packaging>
</project>

View file

@ -5,7 +5,7 @@
import sbt._
import sbt.CompileOrder._
import spde._
import java.util.jar.Attributes
import java.util.jar.Attributes.Name._
import java.io.File
@ -16,8 +16,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// ------------------------------------------------------------
// project versions
val JERSEY_VERSION = "1.1.5"
val ATMO_VERSION = "0.6-SNAPSHOT"
val JERSEY_VERSION = "1.2"
val ATMO_VERSION = "0.6-20100604"
val CASSANDRA_VERSION = "0.6.1"
val LIFT_VERSION = "2.0-scala280-SNAPSHOT"
val SCALATEST_VERSION = "1.2-for-scala-2.8.0.RC3-SNAPSHOT"
@ -27,20 +27,18 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val deployPath = info.projectPath / "deploy"
lazy val distPath = info.projectPath / "dist"
val artifactQualifier = buildScalaVersion + "_osgi"
override def compileOptions = super.compileOptions ++
Seq("-deprecation",
"-Xmigration",
"-Xcheckinit",
"-Xstrict-warnings",
"-Xwarninit",
Seq("-deprecation",
"-Xmigration",
"-Xcheckinit",
"-Xstrict-warnings",
"-Xwarninit",
"-encoding", "utf8")
.map(x => CompileOption(x))
override def javaCompileOptions = JavaCompileOption("-Xlint:unchecked") :: super.javaCompileOptions.toList
def distName = "%s_%s-%s.zip".format(name, artifactQualifier, version)
def distName = "%s_%s-%s.zip".format(name, buildScalaVersion, version)
lazy val dist = zipTask(allArtifacts, "dist", distName) dependsOn (`package`) describedAs("Zips up the distribution.")
@ -67,17 +65,9 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", javaNetRepo)
val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", javaNetRepo)
val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", javaNetRepo)
val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", sonatypeSnapshotRepo)
// val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", sonatypeSnapshotRepo)
val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
/* These are not needed and can possibly be deleted.
val databinder = "DataBinder" at "http://databinder.net/repo"
// val configgy = "Configgy" at "http://www.lag.net/repo"
val codehaus = "Codehaus" at "http://repository.codehaus.org"
val codehaus_snapshots = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org"
val google = "Google" at "http://google-maven-repository.googlecode.com/svn/repository"
*/
// ------------------------------------------------------------
// project defintions
lazy val akka_core = project("akka-core", "akka-core", new AkkaCoreProject(_))
@ -90,8 +80,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_),
akka_core, akka_http, akka_spring, akka_camel, akka_persistence, akka_amqp)
// functional tests in java
lazy val akka_fun_test = project("akka-fun-test-java", "akka-fun-test-java", new AkkaFunTestProject(_), akka_kernel)
// active object tests in java
lazy val akka_active_object_test = project("akka-active-object-test", "akka-active-object-test", new AkkaActiveObjectTestProject(_), akka_kernel)
// examples
lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_))
@ -112,21 +102,22 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// create a manifest with all akka jars and dependency jars on classpath
override def manifestClassPath = Some(allArtifacts.getFiles
.filter(_.getName.endsWith(".jar"))
.filter(!_.getName.contains("servlet_2.4"))
.filter(!_.getName.contains("scala-library"))
.map("lib_managed/scala_%s/compile/".format(artifactQualifier) + _.getName)
.map("lib_managed/scala_%s/compile/".format(buildScalaVersion) + _.getName)
.mkString(" ") +
" scala-library.jar" +
" dist/akka-core_%s-%s.jar".format(artifactQualifier, version) +
" dist/akka-http_%s-%s.jar".format(artifactQualifier, version) +
" dist/akka-camel_%s-%s.jar".format(artifactQualifier, version) +
" dist/akka-amqp_%s-%s.jar".format(artifactQualifier, version) +
" dist/akka-persistence-common_%s-%s.jar".format(artifactQualifier, version) +
" dist/akka-persistence-redis_%s-%s.jar".format(artifactQualifier, version) +
" dist/akka-persistence-mongo_%s-%s.jar".format(artifactQualifier, version) +
" dist/akka-persistence-cassandra_%s-%s.jar".format(artifactQualifier, version) +
" dist/akka-kernel_%s-%s.jar".format(artifactQualifier, version) +
" dist/akka-spring_%s-%s.jar".format(artifactQualifier, version) +
" dist/akka-jta_%s-%s.jar".format(artifactQualifier, version)
" dist/akka-core_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-http_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-camel_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-amqp_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-persistence-common_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-persistence-redis_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-persistence-mongo_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-persistence-cassandra_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-kernel_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-spring_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-jta_%s-%s.jar".format(buildScalaVersion, version)
)
//Exclude slf4j1.5.11 from the classpath, it's conflicting...
@ -162,21 +153,21 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
<distribution>repo</distribution>
</license>
</licenses>
// publish to local mvn
import Process._
lazy val publishLocalMvn = runMvnInstall
lazy val publishLocalMvn = runMvnInstall
def runMvnInstall = task {
for(absPath <- akkaArtifacts.getPaths) {
for (absPath <- akkaArtifacts.getPaths) {
val artifactRE = """(.*)/dist/(.*)-(.*).jar""".r
val artifactRE(path, artifactId, artifactVersion) = absPath
val command = "mvn install:install-file" +
val artifactRE(path, artifactId, artifactVersion) = absPath
val command = "mvn install:install-file" +
" -Dfile=" + absPath +
" -DgroupId=se.scalablesolutions.akka" +
" -DartifactId=" + artifactId +
" -DgroupId=se.scalablesolutions.akka" +
" -DartifactId=" + artifactId +
" -Dversion=" + version +
" -Dpackaging=jar -DgeneratePom=true"
command ! log
command ! log
}
None
} dependsOn(dist) describedAs("Run mvn install for artifacts in dist.")
@ -203,7 +194,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile"
val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile"
val jgroups = "jgroups" % "jgroups" % "2.9.0.GA" % "compile"
// testing
val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
val junit = "junit" % "junit" % "4.5" % "test"
@ -227,11 +218,14 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val atmo = "org.atmosphere" % "atmosphere-annotations" % ATMO_VERSION % "compile"
val atmo_jersey = "org.atmosphere" % "atmosphere-jersey" % ATMO_VERSION % "compile"
val atmo_runtime = "org.atmosphere" % "atmosphere-runtime" % ATMO_VERSION % "compile"
val atmo_tomcat = "org.atmosphere" % "atmosphere-compat-tomcat" % ATMO_VERSION % "compile"
val atmo_weblogic = "org.atmosphere" % "atmosphere-compat-weblogic" % ATMO_VERSION % "compile"
val atmo_jbossweb = "org.atmosphere" % "atmosphere-compat-jbossweb" % ATMO_VERSION % "compile"
val commons_logging = "commons-logging" % "commons-logging" % "1.1.1" % "compile"
val annotation = "javax.annotation" % "jsr250-api" % "1.0" % "compile"
val lift_common = "net.liftweb" % "lift-common" % LIFT_VERSION % "compile"
val lift_util = "net.liftweb" % "lift-util" % LIFT_VERSION % "compile"
// testing
val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
val junit = "junit" % "junit" % "4.5" % "test"
@ -301,19 +295,12 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
// ================= TEST ==================
class AkkaFunTestProject(info: ProjectInfo) extends DefaultProject(info) {
val jackson_core_asl = "org.codehaus.jackson" % "jackson-core-asl" % "1.2.1" % "compile"
val stax_api = "javax.xml.stream" % "stax-api" % "1.0-2" % "compile"
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.2.0" % "compile"
val grizzly = "com.sun.grizzly" % "grizzly-comet-webserver" % "1.9.18-i" % "compile"
val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "compile"
val jersey_json = "com.sun.jersey" % "jersey-json" % JERSEY_VERSION % "compile"
val jersey_atom = "com.sun.jersey" % "jersey-atom" % JERSEY_VERSION % "compile"
class AkkaActiveObjectTestProject(info: ProjectInfo) extends DefaultProject(info) {
// testing
val junit = "junit" % "junit" % "4.5" % "test"
val jmock = "org.jmock" % "jmock" % "2.4.0" % "test"
}
// ================= EXAMPLES ==================
class AkkaSampleAntsProject(info: ProjectInfo) extends DefaultSpdeProject(info) {
val scalaToolsSnapshots = ScalaToolsSnapshots
@ -401,15 +388,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
)
}
def akkaArtifacts = {
descendents(info.projectPath / "dist", "*" + artifactQualifier + "-" + version + ".jar")
}
def akkaArtifacts = descendents(info.projectPath / "dist", "*" + buildScalaVersion + "_osgi-" + version + ".jar")
// ------------------------------------------------------------
class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) with DeployProject with OSGiProject
trait DeployProject extends DefaultProject {
// defines where the deployTask copies jars to
@ -418,7 +400,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val dist = distAction
def distAction = deployTask(jarPath, packageDocsJar, packageSrcJar, deployPath, true, true, true) dependsOn(
`package`, packageDocs, packageSrc) describedAs("Deploying")
def deployTask(jar: Path, docs: Path, src: Path, toDir: Path,
def deployTask(jar: Path, docs: Path, src: Path, toDir: Path,
genJar: Boolean, genDocs: Boolean, genSource: Boolean) = task {
gen(jar, toDir, genJar, "Deploying bits") orElse
gen(docs, toDir, genDocs, "Deploying docs") orElse
@ -432,7 +414,9 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
trait OSGiProject extends DefaultProject with BNDPlugin {
override def artifactID = moduleID + "_osgi"
override def bndExportPackage = Set("*")
}

Binary file not shown.

View file

@ -1,2 +1,2 @@
#!/bin/sh
sed -i '' 's/[[:space:]]*$//g' **/*.*
sed -i '' 's/[[:space:]]*$//g' **/*.scala

View file

@ -1,6 +1,6 @@
#!/bin/bash
cd $AKKA_HOME
VERSION=akka_2.8.0.Beta1-0.8
VERSION=akka_2.8.0.RC3-0.9
TARGET_DIR=dist/$VERSION/$1
shift 1
VMARGS=$@
@ -8,8 +8,8 @@ VMARGS=$@
if [ -d $TARGET_DIR ]; then
cd $TARGET_DIR
else
unzip dist/${VERSION}.zip -d $TARGET_DIR
cd $TARGET_DIR
unzip dist/${VERSION}.zip -d $TARGET_DIR
cd $TARGET_DIR
fi
export AKKA_HOME=`pwd`