Merge commit 'remotes/origin/master' into 224-krasserm

This commit is contained in:
Martin Krasser 2010-06-02 10:47:38 +02:00
commit 9af36d6496
79 changed files with 615 additions and 1614 deletions

3
.gitignore vendored
View file

@ -1,7 +1,8 @@
*~
*#
src_managed
project/plugins/project/
activemq-data
project/plugins/project
project/boot/*
*/project/build/target
*/project/boot

View file

@ -3,24 +3,28 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<name>Akka Functional Tests in Java</name>
<artifactId>akka-fun-test-java</artifactId>
<name>Akka Active Object Tests in Java</name>
<artifactId>akka-active-object-test</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.9</version>
<packaging>jar</packaging>
<properties>
<scala.version>2.8.0.RC2</scala.version>
<atmosphere.version>0.5.2</atmosphere.version>
<jersey.version>1.1.5</jersey.version>
<grizzly.version>1.9.18-i</grizzly.version>
<scala.version>2.8.0.RC3</scala.version>
</properties>
<repositories>
<repository>
<id>embedded-repo</id>
<name>Embedded Repository</name>
<url>file://Users/jboner/src/scala/akka/embedded-repo</url>
<url>file:///Users/jboner/src/scala/akka/embedded-repo</url>
<snapshots />
</repository>
<repository>
<id>jboss</id>
<name>JBoss Repository</name>
<url>https://repository.jboss.org/nexus/content/groups/public</url>
<snapshots />
</repository>
</repositories>
@ -35,59 +39,9 @@
<dependencies>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-kernel_2.8.0.RC2</artifactId>
<artifactId>akka-core_2.8.0.RC3</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-persistence-cassandra_2.8.0.RC2</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.sun.grizzly</groupId>
<artifactId>grizzly-servlet-webserver</artifactId>
<version>${grizzly.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>${jersey.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
<version>${jersey.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-atom</artifactId>
<version>${jersey.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View file

@ -8,13 +8,9 @@ public class AllTest extends TestCase {
public static Test suite() {
TestSuite suite = new TestSuite("All Java tests");
suite.addTestSuite(InMemoryStateTest.class);
//suite.addTestSuite(InMemNestedStateTest.class);
suite.addTestSuite(InMemNestedStateTest.class);
suite.addTestSuite(RemoteInMemoryStateTest.class);
suite.addTestSuite(ActiveObjectGuiceConfiguratorTest.class);
//suite.addTestSuite(PersistentStateTest.class);
//suite.addTestSuite(PersistentNestedStateTest.class);
//suite.addTestSuite(RemotePersistentStateTest.class);
//suite.addTestSuite(RestTest.class);
return suite;
}

View file

@ -9,7 +9,6 @@ import se.scalablesolutions.akka.config.Config;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.kernel.Kernel;
import junit.framework.TestCase;
public class InMemNestedStateTest extends TestCase {

View file

@ -13,7 +13,6 @@ import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.kernel.Kernel;
public class InMemoryStateTest extends TestCase {
static String messageLog = "";

View file

@ -66,23 +66,23 @@ final class ActiveObjectConfiguration {
}
/**
* Holds RTTI (runtime type information) for the Active Object, f.e. current 'sender'
* Holds RTTI (runtime type information) for the Active Object, f.e. current 'sender'
* reference, the 'senderFuture' reference etc.
* <p/>
* In order to make use of this context you have to create a member field in your
* Active Object that has the type 'ActiveObjectContext', then an instance will
* be injected for you to use.
* In order to make use of this context you have to create a member field in your
* Active Object that has the type 'ActiveObjectContext', then an instance will
* be injected for you to use.
* <p/>
* This class does not contain static information but is updated by the runtime system
* at runtime.
* This class does not contain static information but is updated by the runtime system
* at runtime.
* <p/>
* Here is an example of usage:
* Here is an example of usage:
* <pre>
* class Ping {
* // This context will be injected, holds RTTI (runtime type information)
* // for the current message send
* // This context will be injected, holds RTTI (runtime type information)
* // for the current message send
* private ActiveObjectContext context = null;
*
*
* public void hit(int count) {
* Pong pong = (Pong) context.getSender();
* pong.hit(count++)
@ -100,19 +100,19 @@ final class ActiveObjectContext {
* Returns the current sender Active Object reference.
* Scala style getter.
*/
def sender: AnyRef = {
def sender: AnyRef = {
if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.")
else _sender
}
}
/**
* Returns the current sender Active Object reference.
* Java style getter.
*/
def getSender: AnyRef = {
def getSender: AnyRef = {
if (_sender eq null) throw new IllegalStateException("Sender reference should not be null.")
else _sender
}
}
/**
* Returns the current sender future Active Object reference.
@ -364,7 +364,7 @@ object ActiveObject extends Logging {
proxy.asInstanceOf[T]
}
private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef,
private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef,
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val context = injectActiveObjectContext(target)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, false)
@ -462,7 +462,7 @@ object ActiveObject extends Logging {
if (parent != null) injectActiveObjectContext0(activeObject, parent)
else {
log.warning(
"Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.",
"Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.",
activeObject.getClass.getName)
None
}
@ -529,7 +529,7 @@ private[akka] sealed class ActiveObjectAspect {
remoteAddress = init.remoteAddress
timeout = init.timeout
isInitialized = true
}
dispatch(joinPoint)
}
@ -590,7 +590,7 @@ private[akka] sealed class ActiveObjectAspect {
} else future.result
private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE
private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = {
var isEscaped = false
val escapedArgs = for (arg <- args) yield {
@ -613,11 +613,11 @@ private[akka] sealed class ActiveObjectAspect {
joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean, sender: AnyRef, senderFuture: CompletableFuture[Any]) {
override def toString: String = synchronized {
"Invocation [joinPoint: " + joinPoint.toString +
", isOneWay: " + isOneWay +
"Invocation [joinPoint: " + joinPoint.toString +
", isOneWay: " + isOneWay +
", isVoid: " + isVoid +
", sender: " + sender +
", senderFuture: " + senderFuture +
", sender: " + sender +
", senderFuture: " + senderFuture +
"]"
}
@ -660,11 +660,11 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
private var postRestart: Option[Method] = None
private var initTxState: Option[Method] = None
private var context: Option[ActiveObjectContext] = None
def this(transactionalRequired: Boolean) = this(transactionalRequired,None)
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef, ctx: Option[ActiveObjectContext]) = {
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired))
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired))
self.makeTransactionRequired
self.id = targetClass.getName
target = Some(targetInstance)
@ -712,7 +712,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
def receive = {
case Invocation(joinPoint, isOneWay, _, sender, senderFuture) =>
context.foreach { ctx =>
context.foreach { ctx =>
if (sender ne null) ctx._sender = sender
if (senderFuture ne null) ctx._senderFuture = senderFuture
}

View file

@ -295,8 +295,10 @@ trait Actor extends Logging {
type Receive = Actor.Receive
/*
* For internal use only, functions as the implicit sender references when invoking
* one of the message send functions (!, !! and !!!).
* Option[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
* Mainly for internal use, functions as the implicit sender references when invoking
* one of the message send functions ('!', '!!' and '!!!').
*/
implicit val optionSelf: Option[ActorRef] = {
val ref = Actor.actorRefInCreation.value
@ -313,8 +315,10 @@ trait Actor extends Logging {
}
/*
* For internal use only, functions as the implicit sender references when invoking
* the forward function.
* Some[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
* Mainly for internal use, functions as the implicit sender references when invoking
* the 'forward' function.
*/
implicit val someSelf: Some[ActorRef] = optionSelf.asInstanceOf[Some[ActorRef]]
@ -325,9 +329,31 @@ trait Actor extends Logging {
* <pre>
* self ! message
* </pre>
* Here you also find most of the Actor API.
* <p/>
* For example fields like:
* <pre>
* self.dispactcher = ...
* self.trapExit = ...
* self.faultHandler = ...
* self.lifeCycle = ...
* self.sender
* </pre>
* <p/>
* Here you also find methods like:
* <pre>
* self.reply(..)
* self.link(..)
* self.unlink(..)
* self.start(..)
* self.stop(..)
* </pre>
*/
val self: ActorRef = optionSelf.get
self.id = getClass.getName
val self: ActorRef = {
val zelf = optionSelf.get
zelf.id = getClass.getName
zelf
}
/**
* User overridable callback/setting.
@ -339,64 +365,64 @@ trait Actor extends Logging {
* <pre>
* def receive = {
* case Ping =&gt;
* println("got a ping")
* log.info("got a 'Ping' message")
* self.reply("pong")
*
* case OneWay =&gt;
* println("got a oneway")
* log.info("got a 'OneWay' message")
*
* case _ =&gt;
* println("unknown message, ignoring")
* case unknown =&gt;
* log.warning("unknown message [%s], ignoring", unknown)
* }
* </pre>
*/
protected def receive: Receive
/**
* User overridable callback/setting.
* User overridable callback.
* <p/>
* Optional callback method that is called during initialization.
* To be implemented by subclassing actor.
* Is called when an Actor is started by invoking 'actor.start'.
*/
def init {}
/**
* User overridable callback/setting.
* User overridable callback.
* <p/>
* Mandatory callback method that is called during restart and reinitialization after a server crash.
* To be implemented by subclassing actor.
* Is called when 'actor.stop' is invoked.
*/
def shutdown {}
/**
* User overridable callback.
* <p/>
* Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
*/
def preRestart(reason: Throwable) {}
/**
* User overridable callback/setting.
* User overridable callback.
* <p/>
* Mandatory callback method that is called during restart and reinitialization after a server crash.
* To be implemented by subclassing actor.
* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
*/
def postRestart(reason: Throwable) {}
/**
* User overridable callback/setting.
* User overridable callback.
* <p/>
* Optional callback method that is called during termination.
* To be implemented by subclassing actor.
* Is called during initialization. Can be used to initialize transactional state. Will be invoked within a transaction.
*/
def initTransactionalState {}
/**
* User overridable callback/setting.
* <p/>
* Optional callback method that is called during termination.
* To be implemented by subclassing actor.
*/
def shutdown {}
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
private[akka] def base: Receive = lifeCycles orElse (self.hotswap getOrElse receive)
private[akka] def base: Receive = try {
lifeCycles orElse (self.hotswap getOrElse receive)
} catch {
case e: NullPointerException => throw new IllegalStateException(
"The 'self' ActorRef reference for [" + getClass.getName + "] is NULL, error in the ActorRef initialization process.")
}
private val lifeCycles: Receive = {
case HotSwap(code) => self.hotswap = code
@ -414,39 +440,3 @@ trait Actor extends Logging {
override def toString = self.toString
}
/**
* Base class for the different dispatcher types.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed abstract class DispatcherType
/**
* Module that holds the different dispatcher types.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object DispatcherType {
case object EventBasedThreadPooledProxyInvokingDispatcher extends DispatcherType
case object EventBasedSingleThreadDispatcher extends DispatcherType
case object EventBasedThreadPoolDispatcher extends DispatcherType
case object ThreadBasedDispatcher extends DispatcherType
}
/**
* Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
* <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>
* <p/>
* An actor has a well-defined (non-cyclic) life-cycle.
* <pre>
* => NEW (newly created actor) - can't receive messages (yet)
* => STARTED (when 'start' is invoked) - can receive messages
* => SHUT DOWN (when 'exit' is invoked) - can't do anything
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActorMessageInvoker private[akka] (val actorRef: ActorRef) extends MessageInvoker {
def invoke(handle: MessageInvocation) = actorRef.invoke(handle)
}

View file

@ -40,7 +40,7 @@ import java.lang.reflect.Field
* <p/>
* Protobuf Message -> ActorRef:
* <pre>
* val actorRef = ActorRef.fromProtocol(protobufMessage)
* val actorRef = ActorRef.fromProtobuf(protobufMessage)
* actorRef ! message // send message to remote actor through its reference
* </pre>
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -51,15 +51,15 @@ object ActorRef {
* Deserializes the ActorRef instance from a byte array (Array[Byte]) into an ActorRef instance.
*/
def fromBinary(bytes: Array[Byte]): ActorRef =
fromProtocol(ActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
fromProtobuf(ActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
def fromBinary(bytes: Array[Byte], loader: ClassLoader): ActorRef =
fromProtocol(ActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
fromProtobuf(ActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
/**
* Deserializes the ActorRef instance from a Protocol Buffers (protobuf) Message into an ActorRef instance.
*/
private[akka] def fromProtocol(protocol: ActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
private[akka] def fromProtobuf(protocol: ActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
RemoteActorRef(
protocol.getUuid,
protocol.getActorClassName,
@ -222,7 +222,7 @@ trait ActorRef extends TransactionManagement {
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
*/
def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture }
/**
* Is the actor being restarted?
*/
@ -356,7 +356,7 @@ trait ActorRef extends TransactionManagement {
"\n\tYou have probably: " +
"\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
"\n\t\t2. Invoked a method on an Active Object from an instance NOT an Active Object.")
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
* being processed.
@ -527,7 +527,7 @@ trait ActorRef extends TransactionManagement {
*/
def shutdownLinkedActors: Unit
protected[akka] def toProtocol: ActorRefProtocol
protected[akka] def toProtobuf: ActorRefProtocol
protected[akka] def invoke(messageHandle: MessageInvocation): Unit
@ -572,7 +572,7 @@ trait ActorRef extends TransactionManagement {
protected def processSender(senderOption: Option[ActorRef], requestBuilder: RemoteRequestProtocol.Builder) = {
senderOption.foreach { sender =>
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
requestBuilder.setSender(sender.toProtocol)
requestBuilder.setSender(sender.toProtobuf)
}
}
}
@ -595,7 +595,7 @@ sealed class LocalActorRef private[akka](
@volatile private[akka] var _supervisor: Option[ActorRef] = None
protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
protected[this] val actorInstance = new AtomicReference[Actor](newActor)
protected[this] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
@volatile private var isInInitialization = false
@volatile private var runActorInitialization = false
@ -609,7 +609,7 @@ sealed class LocalActorRef private[akka](
/**
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
*/
protected[akka] def toProtocol: ActorRefProtocol = guard.withGuard {
protected[akka] def toProtobuf: ActorRefProtocol = guard.withGuard {
val host = homeAddress.getHostName
val port = homeAddress.getPort
@ -637,7 +637,7 @@ sealed class LocalActorRef private[akka](
/**
* Serializes the ActorRef instance into a byte array (Array[Byte]).
*/
def toBinary: Array[Byte] = toProtocol.toByteArray
def toBinary: Array[Byte] = toProtobuf.toByteArray
/**
* Returns the class for the Actor instance that is managed by the ActorRef.
@ -947,7 +947,7 @@ sealed class LocalActorRef private[akka](
.setIsOneWay(false)
.setIsEscaped(false)
//senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
//senderOption.foreach(sender => requestBuilder.setSender(sender.toProtobuf))
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val id = registerSupervisorAsRemoteActor
@ -1224,7 +1224,7 @@ private[akka] case class RemoteActorRef private[akka] (
extends ActorRef {
_uuid = uuuid
timeout = _timeout
start
lazy val remoteClient = RemoteClient.clientFor(hostname, port, loader)
@ -1292,7 +1292,7 @@ private[akka] case class RemoteActorRef private[akka] (
def mailboxSize: Int = unsupported
def supervisor: Option[ActorRef] = unsupported
def shutdownLinkedActors: Unit = unsupported
protected[akka] def toProtocol: ActorRefProtocol = unsupported
protected[akka] def toProtobuf: ActorRefProtocol = unsupported
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
protected[akka] def restart(reason: Throwable): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported

View file

@ -63,6 +63,12 @@ object ActorRegistry extends ListenerManagement {
all.toList
}
/**
* Finds any actor that matches T.
*/
def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] =
actorsFor[T](manifest).headOption
/**
* Finds all actors of the exact type specified by the class passed in as the Class argument.
*/

View file

@ -88,7 +88,8 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
* </pre>
* <p/>
*
* IMPORTANT:
* <b>IMPORTANT</b>:
* <p/>
* You can *not* call 'agent.get', 'agent()' or use the monadic 'foreach',
* 'map' and 'flatMap' within an enclosing transaction since that would block
* the transaction indefinitely. But all other operations are fine. The system
@ -103,7 +104,6 @@ sealed class Agent[T] private (initialValue: T) {
import Actor._
private val dispatcher = actorOf(new AgentDispatcher[T](initialValue)).start
dispatcher ! Value(initialValue)
/**
* Submits a request to read the internal state.
@ -215,7 +215,7 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto
import Actor._
log.debug("Starting up Agent [%s]", self.uuid)
private lazy val value = Ref[T]()
private val value = Ref[T](initialValue)
/**
* Periodically handles incoming messages.
@ -233,6 +233,5 @@ final class AgentDispatcher[T] private[akka] (initialValue: T) extends Transacto
* Performs a CAS operation, atomically swapping the internal state with the value
* provided as a by-name parameter.
*/
private final def swap(newData: => T): Unit = value.swap(newData)
private def swap(newData: => T): Unit = value.swap(newData)
}

View file

@ -47,46 +47,6 @@ object Supervisor {
def apply(config: SupervisorConfig): Supervisor = SupervisorFactory(config).newInstance.start
}
/**
* Factory object for creating supervisors as Actors, it has both a declarative and programatic API.
* <p/>
*
* Here is a sample on how to use the programmatic API (note that the supervisor is automatically started):
* <pre>
* val supervisor = SupervisorActor(AllForOneStrategy(maxNrOfRetries, timeRange), Array(classOf[Throwable]))
*
* Here is a sample on how to use the declarative API:
* <pre>
* val supervisor = SupervisorActor(
* SupervisorConfig(
* RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
* Supervise(
* myFirstActor,
* LifeCycle(Permanent)) ::
* Supervise(
* mySecondActor,
* LifeCycle(Permanent)) ::
* Nil))
* </pre>
*
* You dynamically link and unlink child children using the 'link' and 'unlink' methods.
* <pre>
* supervisor.link(child)
* supervisor.unlink(child)
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object SupervisorActor {
def apply(config: SupervisorConfig): ActorRef = {
val (handler, trapExits) = SupervisorFactory.retrieveFaultHandlerAndTrapExitsFrom(config)
actorOf(new SupervisorActor(handler, trapExits)).start
}
def apply(handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]): ActorRef =
actorOf(new SupervisorActor(handler, trapExceptions)).start
}
/**
* Use this factory instead of the Supervisor factory object if you want to control
* instantiation and starting of the Supervisor, if not then it is easier and better
@ -166,7 +126,7 @@ sealed class Supervisor private[akka] (
private val childActors = new ConcurrentHashMap[String, List[ActorRef]]
private val childSupervisors = new CopyOnWriteArrayList[Supervisor]
private[akka] val supervisor = SupervisorActor(handler, trapExceptions)
private[akka] val supervisor = actorOf(new SupervisorActor(handler, trapExceptions)).start
def uuid = supervisor.uuid
@ -217,19 +177,7 @@ sealed class Supervisor private[akka] (
}
/**
* Use this class when you want to create a supervisor dynamically that should only
* manage its child children and not have any functionality by itself.
* <p/>
* Here is a sample on how to use it:
* <pre>
* val supervisor = Supervisor(AllForOneStrategy(maxNrOfRetries, timeRange), Array(classOf[Throwable]))
* </pre>
*
* You dynamically link and unlink child children using the 'link' and 'unlink' methods.
* <pre>
* supervisor.link(child)
* supervisor.unlink(child)
* </pre>
* For internal use only.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/

View file

@ -7,19 +7,19 @@ package se.scalablesolutions.akka.dispatch
import java.util.{LinkedList, Queue, List}
import java.util.HashMap
import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor, ActorRef}
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher {
@volatile protected var active: Boolean = false
protected val queue = new ReactiveMessageQueue(name)
protected val messageInvokers = new HashMap[AnyRef, MessageInvoker]
protected val messageInvokers = new HashMap[ActorRef, ActorRef]
protected var selectorThread: Thread = _
protected val guard = new Object
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
override def register(actorRef: ActorRef) = synchronized {
messageInvokers.put(actorRef, new ActorMessageInvoker(actorRef))
messageInvokers.put(actorRef, actorRef)
super.register(actorRef)
}

View file

@ -40,7 +40,7 @@ import se.scalablesolutions.akka.config.Config.config
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Dispatchers {
val THROUGHPUT = config.getInt("akka.dispatch.throughput", 5)
val THROUGHPUT = config.getInt("akka.dispatcher.throughput", 5)
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
override def register(actor: ActorRef) = {

View file

@ -13,6 +13,9 @@ import java.util.concurrent.ConcurrentHashMap
import org.multiverse.commitbarriers.CountDownCommitBarrier
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final class MessageInvocation(val receiver: ActorRef,
val message: Any,
val sender: Option[ActorRef],
@ -49,14 +52,16 @@ final class MessageInvocation(val receiver: ActorRef,
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageQueue {
def append(handle: MessageInvocation)
}
trait MessageInvoker {
def invoke(message: MessageInvocation)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageDispatcher extends Logging {
protected val references = new ConcurrentHashMap[String, ActorRef]
def dispatch(invocation: MessageInvocation)
@ -65,14 +70,16 @@ trait MessageDispatcher extends Logging {
def register(actorRef: ActorRef) = references.put(actorRef.uuid, actorRef)
def unregister(actorRef: ActorRef) = {
references.remove(actorRef.uuid)
if (canBeShutDown)
shutdown // shut down in the dispatcher's references is zero
if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero
}
def canBeShutDown: Boolean = references.isEmpty
def isShutdown: Boolean
def usesActorMailbox : Boolean
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageDemultiplexer {
def select
def wakeUp

View file

@ -7,17 +7,16 @@ package se.scalablesolutions.akka.dispatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.Queue
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorMessageInvoker}
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ThreadBasedDispatcher(actor: ActorRef) extends MessageDispatcher {
class ThreadBasedDispatcher(private val actor: ActorRef) extends MessageDispatcher {
private val name = actor.getClass.getName + ":" + actor.uuid
private val threadName = "thread-based:dispatcher:" + name
private val messageHandler = new ActorMessageInvoker(actor)
private val queue = new BlockingMessageQueue(name)
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
@ -30,7 +29,7 @@ class ThreadBasedDispatcher(actor: ActorRef) extends MessageDispatcher {
override def run = {
while (active) {
try {
messageHandler.invoke(queue.take)
actor.invoke(queue.take)
} catch { case e: InterruptedException => active = false }
}
}

View file

@ -261,11 +261,11 @@ object Cluster extends Cluster with Logging {
sup <- createSupervisor(actorRef)
} {
val serializer = Class.forName(config.getString(
"akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
.newInstance.asInstanceOf[Serializer]
"akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
.newInstance.asInstanceOf[Serializer]
classLoader = serializerClassLoader orElse classLoader
serializer.classLoader = classLoader
classLoader = serializerClassLoader orElse classLoader
serializer.classLoader = classLoader
actorRef.start
sup.start
actorRef ! InitClusterActor(serializer)

View file

@ -82,19 +82,19 @@ object RemoteClient extends Logging {
private[akka] def actorFor(uuid: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef =
RemoteActorRef(uuid, className, hostname, port, timeout, loader)
def clientFor(hostname: String, port: Int): RemoteClient =
def clientFor(hostname: String, port: Int): RemoteClient =
clientFor(new InetSocketAddress(hostname, port), None)
def clientFor(hostname: String, port: Int, loader: ClassLoader): RemoteClient =
def clientFor(hostname: String, port: Int, loader: ClassLoader): RemoteClient =
clientFor(new InetSocketAddress(hostname, port), Some(loader))
def clientFor(address: InetSocketAddress): RemoteClient =
def clientFor(address: InetSocketAddress): RemoteClient =
clientFor(address, None)
def clientFor(address: InetSocketAddress, loader: ClassLoader): RemoteClient =
def clientFor(address: InetSocketAddress, loader: ClassLoader): RemoteClient =
clientFor(address, Some(loader))
private[akka] def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient =
private[akka] def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient =
clientFor(new InetSocketAddress(hostname, port), loader)
private[akka] def clientFor(address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized {
@ -330,7 +330,7 @@ class RemoteClientHandler(val name: String,
client.connection = bootstrap.connect(remoteAddress)
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
if (!client.connection.isSuccess) {
client.listeners.toArray.foreach(l =>
client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
@ -339,13 +339,13 @@ class RemoteClientHandler(val name: String,
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.foreach(l =>
client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientConnected(client.hostname, client.port))
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.foreach(l =>
client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port))
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
}

View file

@ -184,7 +184,7 @@ class RemoteServer extends Logging {
def start(_hostname: String, _port: Int): RemoteServer =
start(_hostname, _port, None)
private def start(_hostname: String, _port: Int, loader: ClassLoader): RemoteServer =
private def start(_hostname: String, _port: Int, loader: ClassLoader): RemoteServer =
start(_hostname, _port, Some(loader))
private def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized {
@ -366,7 +366,7 @@ class RemoteServerHandler(
val message = RemoteProtocolBuilder.getMessage(request)
if (request.hasSender) {
val sender = request.getSender
if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtocol(sender, applicationLoader)))
if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtobuf(sender, applicationLoader)))
} else {
try {
val resultOrNone = actorRef !! message

View file

@ -13,7 +13,7 @@ case class Listen(listener: ActorRef) extends ListenerMessage
case class Deafen(listener: ActorRef) extends ListenerMessage
case class WithListeners(f: List[ActorRef] => Unit) extends ListenerMessage
/**
/**
* Listeners is a generic trait to implement listening capability on an Actor.
* <p/>
* Use the <code>gossip(msg)</code> method to have it sent to the listeners.
@ -34,6 +34,6 @@ trait Listeners { self: Actor =>
}
protected def gossip(msg: Any) = listenersAsList foreach (_ ! msg)
private def listenersAsList: List[ActorRef] = listeners.toArray.toList.asInstanceOf[List[ActorRef]]
}

View file

@ -282,10 +282,10 @@ object Transaction {
setTransaction(Some(tx))
mtx.registerLifecycleListener(new TransactionLifecycleListener() {
def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event.name match {
case "postCommit" =>
case "postCommit" =>
log.trace("Committing transaction [%s]", mtx)
tx.commit
case "postAbort" =>
case "postAbort" =>
log.trace("Aborting transaction [%s]", mtx)
tx.abort
case _ => {}
@ -309,7 +309,7 @@ object Transaction {
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[akka] var transaction: Option[MultiverseTransaction] = None
private[this] val persistentStateMap = new HashMap[String, Committable]
private[this] val persistentStateMap = new HashMap[String, Committable with Abortable]
private[akka] val depth = new AtomicInteger(0)
val jta: Option[TransactionContainer] =
@ -329,9 +329,7 @@ object Transaction {
def commit = synchronized {
log.trace("Committing transaction %s", toString)
Transaction.atomic0 {
persistentStateMap.valuesIterator.foreach(_.commit)
}
persistentStateMap.valuesIterator.foreach(_.commit)
status = TransactionStatus.Completed
jta.foreach(_.commit)
}
@ -339,6 +337,8 @@ object Transaction {
def abort = synchronized {
log.trace("Aborting transaction %s", toString)
jta.foreach(_.rollback)
persistentStateMap.valuesIterator.foreach(_.abort)
persistentStateMap.clear
}
def isNew = synchronized { status == TransactionStatus.New }
@ -361,7 +361,7 @@ object Transaction {
private[akka] def isTopLevel = depth.get == 0
private[akka] def register(uuid: String, storage: Committable) = persistentStateMap.put(uuid, storage)
private[akka] def register(uuid: String, storage: Committable with Abortable) = persistentStateMap.put(uuid, storage)
private def ensureIsActive = if (status != TransactionStatus.Active)
throw new StmConfigurationException(

View file

@ -56,6 +56,13 @@ trait Committable {
def commit: Unit
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Abortable {
def abort: Unit
}
object RefFactory {
private val factory = getGlobalStmInstance.getProgrammaticReferenceFactoryBuilder.build

View file

@ -21,14 +21,14 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4")
val targetOk = new AtomicInteger(0)
val t1 = actorOf( new Actor() {
def receive = {
def receive = {
case `testMsg1` => self.reply(3)
case `testMsg2` => self.reply(7)
}
} ).start
val t2 = actorOf( new Actor() {
def receive = {
def receive = {
case `testMsg3` => self.reply(11)
}
}).start
@ -43,9 +43,9 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
b <- (d.!![Int](testMsg2,5000))
c <- (d.!![Int](testMsg3,5000))
} yield a + b + c
result.get must be(21)
for(a <- List(t1,t2,d)) a.stop
for(a <- List(t1,t2,d)) a.stop
}
@Test def testLogger = {

View file

@ -95,16 +95,42 @@ class StmSpec extends
val size2: Int = (actor !! Size).getOrElse(fail("Could not get Vector::size"))
size2 should equal(3)
} catch {
case e =>
case e =>
e.printStackTrace
fail(e.toString)
}
}
}
describe("Transactor") {
it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multipse times in a row") {
import GlobalTransactionVectorTestActor._
try {
val actor = actorOf[NestedTransactorLevelOneActor].start
actor !! Add(2)
val size1: Int = (actor !! Size).getOrElse(fail("Could not get size"))
size1 should equal(2)
actor !! Add(7)
actor ! "HiLevelOne"
val size2: Int = (actor !! Size).getOrElse(fail("Could not get size"))
size2 should equal(7)
actor !! Add(0)
actor ! "HiLevelTwo"
val size3: Int = (actor !! Size).getOrElse(fail("Could not get size"))
size3 should equal(0)
actor !! Add(3)
val size4: Int = (actor !! Size).getOrElse(fail("Could not get size"))
size4 should equal(3)
} catch {
case e =>
fail(e.toString)
}
}
}
/*
describe("Multiverse API") {
it("should blablabla") {
import org.multiverse.api.programmatic._
// import org.multiverse.api._
import org.multiverse.templates._
@ -113,13 +139,13 @@ class StmSpec extends
import org.multiverse.api.{GlobalStmInstance, ThreadLocalTransaction, Transaction => MultiverseTransaction}
import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.commitbarriers._
def createRef[T]: ProgrammaticReference[T] = GlobalStmInstance
.getGlobalStmInstance
.getProgrammaticReferenceFactoryBuilder
.build
.atomicCreateReference(null.asInstanceOf[T])
val ref1 = Ref(0)//createRef[Int]
val ref2 = Ref(0)//createRef[Int]
@ -158,15 +184,47 @@ class GlobalTransactionVectorTestActor extends Actor {
import GlobalTransactionVectorTestActor._
import se.scalablesolutions.akka.stm.Transaction.Global
private var vector: TransactionalVector[Int] = Global.atomic { TransactionalVector(1) }
private val vector: TransactionalVector[Int] = Global.atomic { TransactionalVector(1) }
def receive = {
case Add(value) =>
case Add(value) =>
Global.atomic { vector + value}
self.reply(Success)
case Size =>
case Size =>
val size = Global.atomic { vector.size }
self.reply(size)
}
}
class NestedTransactorLevelOneActor extends Actor {
import GlobalTransactionVectorTestActor._
private val nested = actorOf[NestedTransactorLevelTwoActor].start
def receive = {
case add @ Add(_) =>
self.reply((nested !! add).get)
case Size =>
self.reply((nested !! Size).get)
case "HiLevelOne" => println("HiLevelOne")
case "HiLevelTwo" => nested ! "HiLevelTwo"
}
}
class NestedTransactorLevelTwoActor extends Actor {
import GlobalTransactionVectorTestActor._
private val ref = Ref(0)
def receive = {
case Add(value) =>
ref.swap(value)
self.reply(Success)
case Size =>
self.reply(ref.getOrElse(-1))
case "HiLevelTwo" => println("HiLevelTwo")
}
}

View file

@ -1,14 +0,0 @@
package se.scalablesolutions.akka.api;
import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
@Path("/foo")
public class JerseyFoo {
@GET
@Produces({"application/json"})
public String foo() {
return "hello foo";
}
}

View file

@ -1,11 +0,0 @@
package se.scalablesolutions.akka.api;
public class PersistenceManager {
private static volatile boolean isRunning = false;
public static void init() {
if (!isRunning) {
se.scalablesolutions.akka.kernel.Kernel$.MODULE$.startRemoteService();
isRunning = true;
}
}
}

View file

@ -1,26 +0,0 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.persistence.common.*;
import se.scalablesolutions.akka.persistence.cassandra.*;
import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
public class PersistentClasher {
private PersistentMap state;
@inittransactionalstate
public void init() {
state = CassandraStorage.newMap();
}
public String getState(String key) {
return (String)state.get(key).get();
}
public void setState(String key, String msg) {
state.put(key, msg);
}
public void clash() {
state.put("clasher", "was here");
}
}

View file

@ -1,7 +0,0 @@
package se.scalablesolutions.akka.api;
public class PersistentFailer implements java.io.Serializable {
public int fail() {
throw new RuntimeException("expected");
}
}

View file

@ -1,110 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.config.*;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.kernel.Kernel;
import junit.framework.TestCase;
public class PersistentNestedStateTest extends TestCase {
static String messageLog = "";
final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
protected void setUp() {
PersistenceManager.init();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}),
new Component[]{
new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 10000000),
new Component(PersistentStatefulNested.class, new LifeCycle(new Permanent()), 10000000),
new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000)
//new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent()), 100000)
}).inject().supervise();
}
protected void tearDown() {
conf.stop();
}
public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() throws Exception {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
assertEquals("new state", nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
assertEquals("init", nested.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
}
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setVectorState("init"); // set init state
PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
nested.setVectorState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
assertEquals(2, stateful.getVectorLength()); // BAD: keeps one element since last test
assertEquals(2, nested.getVectorLength());
}
public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setVectorState("init"); // set init state
PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
nested.setVectorState("init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals(1, stateful.getVectorLength());
assertEquals(1, nested.getVectorLength());
}
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
stateful.setRefState("init"); // set init state
nested.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
assertEquals("new state", stateful.getRefState());
assertEquals("new state", nested.getRefState());
}
public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
PersistentStatefulNested nested = conf.getInstance(PersistentStatefulNested.class);
stateful.setRefState("init"); // set init state
nested.setRefState("init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getRefState()); // check that state is == init state
assertEquals("init", nested.getRefState()); // check that state is == init state
}
}

View file

@ -1,89 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.config.*;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import junit.framework.TestCase;
public class PersistentStateTest extends TestCase {
static String messageLog = "";
final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
protected void setUp() {
PersistenceManager.init();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}),
new Component[] {
new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 10000000),
new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000)
//new Component(PersistentClasher.class, new LifeCycle(new Permanent()), 100000)
}).supervise();
}
protected void tearDown() {
conf.stop();
}
public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
}
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setVectorState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("init", stateful.getVectorState(0));
assertEquals("new state", stateful.getVectorState(1));
}
public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setVectorState("init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getVectorState(0)); // check that state is == init state
}
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getRefState());
}
public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setRefState("init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getRefState()); // check that state is == init state
}
}

View file

@ -1,84 +0,0 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.persistence.common.*;
import se.scalablesolutions.akka.persistence.cassandra.*;
@transactionrequired
public class PersistentStateful {
private PersistentMap mapState;
private PersistentVector vectorState;
private PersistentRef refState;
@inittransactionalstate
public void init() {
mapState = CassandraStorage.newMap();
vectorState = CassandraStorage.newVector();
refState = CassandraStorage.newRef();
}
public String getMapState(String key) {
byte[] bytes = (byte[]) mapState.get(key.getBytes()).get();
return new String(bytes, 0, bytes.length);
}
public String getVectorState(int index) {
byte[] bytes = (byte[]) vectorState.get(index);
return new String(bytes, 0, bytes.length);
}
public int getVectorLength() {
return vectorState.length();
}
public String getRefState() {
if (refState.isDefined()) {
byte[] bytes = (byte[]) refState.get().get();
return new String(bytes, 0, bytes.length);
} else throw new IllegalStateException("No such element");
}
public void setMapState(String key, String msg) {
mapState.put(key.getBytes(), msg.getBytes());
}
public void setVectorState(String msg) {
vectorState.add(msg.getBytes());
}
public void setRefState(String msg) {
refState.swap(msg.getBytes());
}
public void success(String key, String msg) {
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
}
public String failure(String key, String msg, PersistentFailer failer) {
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
failer.fail();
return msg;
}
public String success(String key, String msg, PersistentStatefulNested nested) {
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
nested.success(key, msg);
return msg;
}
public String failure(String key, String msg, PersistentStatefulNested nested, PersistentFailer failer) {
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
nested.failure(key, msg, failer);
return msg;
}
}

View file

@ -1,70 +0,0 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.persistence.common.*;
import se.scalablesolutions.akka.persistence.cassandra.*;
@transactionrequired
public class PersistentStatefulNested {
private PersistentMap mapState;
private PersistentVector vectorState;
private PersistentRef refState;
@inittransactionalstate
public void init() {
mapState = CassandraStorage.newMap();
vectorState = CassandraStorage.newVector();
refState = CassandraStorage.newRef();
}
public String getMapState(String key) {
byte[] bytes = (byte[]) mapState.get(key.getBytes()).get();
return new String(bytes, 0, bytes.length);
}
public String getVectorState(int index) {
byte[] bytes = (byte[]) vectorState.get(index);
return new String(bytes, 0, bytes.length);
}
public int getVectorLength() {
return vectorState.length();
}
public String getRefState() {
if (refState.isDefined()) {
byte[] bytes = (byte[]) refState.get().get();
return new String(bytes, 0, bytes.length);
} else throw new IllegalStateException("No such element");
}
public void setMapState(String key, String msg) {
mapState.put(key.getBytes(), msg.getBytes());
}
public void setVectorState(String msg) {
vectorState.add(msg.getBytes());
}
public void setRefState(String msg) {
refState.swap(msg.getBytes());
}
public String success(String key, String msg) {
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
return msg;
}
public String failure(String key, String msg, PersistentFailer failer) {
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
failer.fail();
return msg;
}
}

View file

@ -1,403 +0,0 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
package se.scalablesolutions.akka.api;
public final class ProtobufProtocol {
private ProtobufProtocol() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
public static final class ProtobufPOJO extends
com.google.protobuf.GeneratedMessage {
// Use ProtobufPOJO.newBuilder() to construct.
private ProtobufPOJO() {}
private static final ProtobufPOJO defaultInstance = new ProtobufPOJO();
public static ProtobufPOJO getDefaultInstance() {
return defaultInstance;
}
public ProtobufPOJO getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return se.scalablesolutions.akka.api.ProtobufProtocol.internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return se.scalablesolutions.akka.api.ProtobufProtocol.internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_fieldAccessorTable;
}
// required uint64 id = 1;
public static final int ID_FIELD_NUMBER = 1;
private boolean hasId;
private long id_ = 0L;
public boolean hasId() { return hasId; }
public long getId() { return id_; }
// required string name = 2;
public static final int NAME_FIELD_NUMBER = 2;
private boolean hasName;
private java.lang.String name_ = "";
public boolean hasName() { return hasName; }
public java.lang.String getName() { return name_; }
// required bool status = 3;
public static final int STATUS_FIELD_NUMBER = 3;
private boolean hasStatus;
private boolean status_ = false;
public boolean hasStatus() { return hasStatus; }
public boolean getStatus() { return status_; }
public final boolean isInitialized() {
if (!hasId) return false;
if (!hasName) return false;
if (!hasStatus) return false;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
if (hasId()) {
output.writeUInt64(1, getId());
}
if (hasName()) {
output.writeString(2, getName());
}
if (hasStatus()) {
output.writeBool(3, getStatus());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (hasId()) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(1, getId());
}
if (hasName()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(2, getName());
}
if (hasStatus()) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(3, getStatus());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeDelimitedFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeDelimitedFrom(input, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder> {
private se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO result;
// Construct using se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.newBuilder()
private Builder() {}
private static Builder create() {
Builder builder = new Builder();
builder.result = new se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO();
return builder;
}
protected se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO internalGetResult() {
return result;
}
public Builder clear() {
if (result == null) {
throw new IllegalStateException(
"Cannot call clear() after build().");
}
result = new se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO();
return this;
}
public Builder clone() {
return create().mergeFrom(result);
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.getDescriptor();
}
public se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO getDefaultInstanceForType() {
return se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.getDefaultInstance();
}
public boolean isInitialized() {
return result.isInitialized();
}
public se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO build() {
if (result != null && !isInitialized()) {
throw newUninitializedMessageException(result);
}
return buildPartial();
}
private se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
if (!isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return buildPartial();
}
public se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO buildPartial() {
if (result == null) {
throw new IllegalStateException(
"build() has already been called on this Builder.");
}
se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO returnMe = result;
result = null;
return returnMe;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO) {
return mergeFrom((se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO other) {
if (other == se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.getDefaultInstance()) return this;
if (other.hasId()) {
setId(other.getId());
}
if (other.hasName()) {
setName(other.getName());
}
if (other.hasStatus()) {
setStatus(other.getStatus());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
return this;
}
break;
}
case 8: {
setId(input.readUInt64());
break;
}
case 18: {
setName(input.readString());
break;
}
case 24: {
setStatus(input.readBool());
break;
}
}
}
}
// required uint64 id = 1;
public boolean hasId() {
return result.hasId();
}
public long getId() {
return result.getId();
}
public Builder setId(long value) {
result.hasId = true;
result.id_ = value;
return this;
}
public Builder clearId() {
result.hasId = false;
result.id_ = 0L;
return this;
}
// required string name = 2;
public boolean hasName() {
return result.hasName();
}
public java.lang.String getName() {
return result.getName();
}
public Builder setName(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
result.hasName = true;
result.name_ = value;
return this;
}
public Builder clearName() {
result.hasName = false;
result.name_ = getDefaultInstance().getName();
return this;
}
// required bool status = 3;
public boolean hasStatus() {
return result.hasStatus();
}
public boolean getStatus() {
return result.getStatus();
}
public Builder setStatus(boolean value) {
result.hasStatus = true;
result.status_ = value;
return this;
}
public Builder clearStatus() {
result.hasStatus = false;
result.status_ = false;
return this;
}
}
static {
se.scalablesolutions.akka.api.ProtobufProtocol.getDescriptor();
}
static {
se.scalablesolutions.akka.api.ProtobufProtocol.internalForceInit();
}
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
java.lang.String[] descriptorData = {
"\n4se/scalablesolutions/akka/api/Protobuf" +
"Protocol.proto\022\035se.scalablesolutions.akk" +
"a.api\"8\n\014ProtobufPOJO\022\n\n\002id\030\001 \002(\004\022\014\n\004nam" +
"e\030\002 \002(\t\022\016\n\006status\030\003 \002(\010"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_se_scalablesolutions_akka_api_ProtobufPOJO_descriptor,
new java.lang.String[] { "Id", "Name", "Status", },
se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.class,
se.scalablesolutions.akka.api.ProtobufProtocol.ProtobufPOJO.Builder.class);
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
}, assigner);
}
public static void internalForceInit() {}
}

View file

@ -1,17 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.api;
/*
Compile with:
cd ./akka-fun-test-java/src/test/java
protoc se/scalablesolutions/akka/api/ProtobufProtocol.proto --java_out .
*/
message ProtobufPOJO {
required uint64 id = 1;
required string name = 2;
required bool status = 3;
}

View file

@ -1,38 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.api;
import junit.framework.TestCase;
import se.scalablesolutions.akka.serialization.SerializerFactory;
public class ProtobufSerializationTest extends TestCase {
public void testOutIn() throws Exception {
SerializerFactory factory = new SerializerFactory();
ProtobufProtocol.ProtobufPOJO pojo1 = ProtobufProtocol.ProtobufPOJO.getDefaultInstance().toBuilder().setId(1).setName("protobuf").setStatus(true).build();
byte[] bytes = factory.getProtobuf().out(pojo1);
Object obj = factory.getProtobuf().in(bytes, pojo1.getClass());
assertTrue(obj instanceof ProtobufProtocol.ProtobufPOJO);
ProtobufProtocol.ProtobufPOJO pojo2 = (ProtobufProtocol.ProtobufPOJO)obj;
assertEquals(pojo1.getId(), pojo2.getId());
assertEquals(pojo1.getName(), pojo2.getName());
assertEquals(pojo1.getStatus(), pojo2.getStatus());
}
public void testDeepClone() throws Exception {
SerializerFactory factory = new SerializerFactory();
ProtobufProtocol.ProtobufPOJO pojo1 = ProtobufProtocol.ProtobufPOJO.getDefaultInstance().toBuilder().setId(1).setName("protobuf").setStatus(true).build();
Object obj = factory.getProtobuf().deepClone(pojo1);
assertTrue(obj instanceof ProtobufProtocol.ProtobufPOJO);
ProtobufProtocol.ProtobufPOJO pojo2 = (ProtobufProtocol.ProtobufPOJO)obj;
assertEquals(pojo1.getId(), pojo2.getId());
assertEquals(pojo1.getName(), pojo2.getName());
assertEquals(pojo1.getStatus(), pojo2.getStatus());
}
}

View file

@ -1,88 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.config.*;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import junit.framework.TestCase;
public class RemotePersistentStateTest extends TestCase {
static String messageLog = "";
final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
protected void setUp() {
PersistenceManager.init();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[] {
new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 1000000, new RemoteAddress("localhost", 9999)),
new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000000, new RemoteAddress("localhost", 9999))
}).supervise();
}
protected void tearDown() {
conf.stop();
}
public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "MapShouldRollBack", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
}
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
int init = stateful.getVectorLength();
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "VectorShouldNotRollback"); // transactionrequired
assertEquals(init + 1, stateful.getVectorLength());
}
public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
int init = stateful.getVectorLength();
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals(init, stateful.getVectorLength());
}
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getRefState());
}
public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getInstance(PersistentStateful.class);
stateful.setRefState("init"); // set init state
PersistentFailer failer = conf.getInstance(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getRefState()); // check that state is == init state
}
}

View file

@ -1,16 +0,0 @@
<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
<suite name="Tests for Akka Java API module" verbose="1">
<test name="core">
<groups>
<run>
<include name="functional"/>
<include name="unit"/>
<exclude name="broken"/>
</run>
</groups>
<packages>
<package name="com.scalablesolutions.akka.api.*" />
</packages>
</test>
</suite>

View file

@ -16,11 +16,11 @@ case class ClusterCometBroadcast(name: String, msg: AnyRef) extends ClusterComet
* Enables explicit clustering of Atmosphere (Comet) resources
* Annotate the endpoint which has the @Broadcast annotation with
* @org.atmosphere.annotation.Cluster(Array(classOf[AkkClusterBroadcastFilter])){ name = "someUniqueName" }
* that's all folks!
* thats all folks!
* Note: In the future, clustering comet will be transparent
*/
class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRef] {
class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter {
@BeanProperty var clusterName = ""
@BeanProperty var broadcaster : Broadcaster = null

View file

@ -20,8 +20,8 @@ class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProce
private val handler = new AbstractReflectorAtmosphereHandler {
override def onRequest(event: AtmosphereResource[HttpServletRequest, HttpServletResponse]) {
if (event ne null) {
event.getRequest.setAttribute(ReflectorServletProcessor.ATMOSPHERE_RESOURCE, event)
event.getRequest.setAttribute(ReflectorServletProcessor.ATMOSPHERE_HANDLER, this)
event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, event)
event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_HANDLER, this)
service(event.getRequest, event.getResponse)
}
}
@ -42,7 +42,10 @@ class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProce
* <p/>
* Used by the Akka Kernel to bootstrap REST and Comet.
*/
class AkkaServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging {
class AkkaServlet extends AtmosphereServlet with Logging {
addInitParameter(AtmosphereServlet.DISABLE_ONSTATE_EVENT,"true")
addInitParameter(AtmosphereServlet.BROADCASTER_CLASS,classOf[AkkaBroadcaster].getName)
lazy val servlet = createRestServlet
protected def createRestServlet : AtmosphereRestServlet = new AtmosphereRestServlet {
@ -53,9 +56,9 @@ class AkkaServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging {
* Instead we specify what semantics we want in code.
*/
override def loadConfiguration(sc: ServletConfig) {
config = new AtmosphereConfig { supportSession = false }
setDefaultBroadcasterClassName(classOf[AkkaBroadcaster].getName)
atmosphereHandlers.put("/*", new AtmosphereServlet.AtmosphereHandlerWrapper(servlet, new AkkaBroadcaster))
config.setSupportSession(false)
isBroadcasterSpecified = true
addAtmosphereHandler("/*", servlet, new AkkaBroadcaster)
}
/**

View file

@ -47,29 +47,29 @@ class AkkaLoader extends Logging {
private def printBanner = {
log.info(
"""
t
t t t
t t tt t
tt t t tt t
t ttttttt t ttt t
t tt ttt t ttt t
t t ttt t ttt t t
tt t ttt ttt ttt t
t t ttt ttt t tt t
t ttt ttt t t
tt ttt ttt t
ttt ttt
tttttttt ttt ttt ttt ttt tttttttt
ttt tt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt tt ttt ttt
tttt ttttttttt tttttttt tttt
ttttttttt ttt ttt ttt ttt ttttttttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt tt ttt ttt ttt ttt ttt ttt
t
t t t
t t tt t
tt t t tt t
t ttttttt t ttt t
t tt ttt t ttt t
t t ttt t ttt t t
tt t ttt ttt ttt t
t t ttt ttt t tt t
t ttt ttt t t
tt ttt ttt t
ttt ttt
tttttttt ttt ttt ttt ttt tttttttt
ttt tt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt tt ttt ttt
tttt ttttttttt tttttttt tttt
ttttttttt ttt ttt ttt ttt ttttttttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt tt ttt ttt ttt ttt ttt ttt
tttttttt ttt ttt ttt ttt tttttttt
==================================================
""")
log.info(" Running version %s", Config.VERSION)

View file

@ -50,14 +50,14 @@ trait EmbeddedAppServer extends Bootable with Logging {
Thread.currentThread.setContextClassLoader(applicationLoader.get)
super.init(sc)
}
finally {
finally {
Thread.currentThread.setContextClassLoader(cl)
}
}
})
adapter.setContextPath(uri.getPath)
adapter.addInitParameter("cometSupport",
adapter.addInitParameter("cometSupport",
"org.atmosphere.container.GrizzlyCometSupport")
adapter.addInitParameter("com.sun.jersey.config.property.resourceConfigClass",
"com.sun.jersey.api.core.PackagesResourceConfig")
@ -65,7 +65,7 @@ trait EmbeddedAppServer extends Bootable with Logging {
config.getList("akka.rest.resource_packages").mkString(";")
)
adapter.addInitParameter("com.sun.jersey.spi.container.ResourceFilters",
config.getList("akka.rest.filters").mkString(",")
config.getList("akka.rest.filters").mkString(",")
)
if (HOME.isDefined) adapter.addRootFolder(HOME.get + "/deploy/root")

View file

@ -42,7 +42,7 @@ private[akka] object CassandraStorageBackend extends
case "ALL" => ConsistencyLevel.ALL
case "ANY" => ConsistencyLevel.ANY
case unknown => throw new IllegalArgumentException(
"Cassandra consistency level [" + unknown + "] is not supported." +
"Cassandra consistency level [" + unknown + "] is not supported." +
"\n\tExpected one of [ZERO, ONE, QUORUM, DCQUORUM, DCQUORUMSYNC, ALL, ANY] in the akka.conf configuration file.")
}
}
@ -105,9 +105,9 @@ private[akka] object CassandraStorageBackend extends
}
}
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) =
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) =
elements.foreach(insertVectorStorageEntryFor(name, _))
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
val columnPath = new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family)
columnPath.setColumn(intToBytes(index))

View file

@ -171,4 +171,4 @@ object EmbeddedCassandraService {
def start: Unit = {}
}
*/
*/

View file

@ -30,7 +30,7 @@ class StorageException(message: String) extends RuntimeException(message)
* <pre>
* val myMap = CassandraStorage.getMap(id)
* </pre>
*
*
* Example Java usage:
* <pre>
* PersistentMap<Object, Object> myMap = MongoStorage.newMap();
@ -72,7 +72,7 @@ trait Storage {
}
/**
* Implementation of <tt>PersistentMap</tt> for every concrete
* Implementation of <tt>PersistentMap</tt> for every concrete
* storage will have the same workflow. This abstracts the workflow.
*
* Subclasses just need to provide the actual concrete instance for the
@ -81,7 +81,7 @@ trait Storage {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
with Transactional with Committable with Logging {
with Transactional with Committable with Abortable with Logging {
protected val newAndUpdatedEntries = TransactionalState.newMap[K, V]
protected val removedEntries = TransactionalState.newVector[K]
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
@ -97,6 +97,12 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
removedEntries.clear
}
def abort = {
newAndUpdatedEntries.clear
removedEntries.clear
shouldClearOnCommit.swap(false)
}
def -=(key: K) = {
remove(key)
this
@ -111,23 +117,23 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
put(key, value)
this
}
override def put(key: K, value: V): Option[V] = {
register
newAndUpdatedEntries.put(key, value)
}
override def update(key: K, value: V) = {
override def update(key: K, value: V) = {
register
newAndUpdatedEntries.update(key, value)
}
override def remove(key: K) = {
register
removedEntries.add(key)
newAndUpdatedEntries.get(key)
}
def slice(start: Option[K], count: Int): List[Tuple2[K, V]] =
slice(start, None, count)
@ -135,11 +141,11 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
storage.getMapStorageRangeFor(uuid, start, finish, count)
} catch { case e: Exception => Nil }
override def clear = {
override def clear = {
register
shouldClearOnCommit.swap(true)
}
override def contains(key: K): Boolean = try {
newAndUpdatedEntries.contains(key) ||
storage.getMapStorageEntryFor(uuid, key).isDefined
@ -157,9 +163,9 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
storage.getMapStorageEntryFor(uuid, key)
} catch { case e: Exception => None }
}
def iterator = elements
override def elements: Iterator[Tuple2[K, V]] = {
new Iterator[Tuple2[K, V]] {
private val originalList: List[Tuple2[K, V]] = try {
@ -167,10 +173,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
} catch {
case e: Throwable => Nil
}
private var elements = newAndUpdatedEntries.toList union originalList.reverse
private var elements = newAndUpdatedEntries.toList union originalList.reverse
override def next: Tuple2[K, V]= synchronized {
val element = elements.head
elements = elements.tail
elements = elements.tail
element
}
override def hasNext: Boolean = synchronized { !elements.isEmpty }
@ -188,7 +194,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable {
trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable with Abortable {
protected val newElems = TransactionalState.newVector[T]
protected val updatedElems = TransactionalState.newMap[Int, T]
protected val removedElems = TransactionalState.newVector[T]
@ -203,13 +209,20 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
updatedElems.clear
}
def abort = {
newElems.clear
updatedElems.clear
removedElems.clear
shouldClearOnCommit.swap(false)
}
def +(elem: T) = add(elem)
def add(elem: T) = {
register
newElems + elem
}
def apply(index: Int): T = get(index)
def get(index: Int): T = {
@ -218,7 +231,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
}
override def slice(start: Int, finish: Int): IndexedSeq[T] = slice(Some(start), Some(finish))
def slice(start: Option[Int], finish: Option[Int], count: Int = 0): IndexedSeq[T] = {
val buffer = new scala.collection.mutable.ArrayBuffer[T]
storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_))
@ -262,21 +275,23 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentRef[T] extends Transactional with Committable {
trait PersistentRef[T] extends Transactional with Committable with Abortable {
protected val ref = new TransactionalRef[T]
val storage: RefStorageBackend[T]
def commit = if (ref.isDefined) {
storage.insertRefStorageFor(uuid, ref.get.get)
ref.swap(null.asInstanceOf[T])
ref.swap(null.asInstanceOf[T])
}
def abort = ref.swap(null.asInstanceOf[T])
def swap(elem: T) = {
register
ref.swap(elem)
}
def get: Option[T] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid)
def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined
@ -294,7 +309,7 @@ trait PersistentRef[T] extends Transactional with Committable {
}
/**
* Implementation of <tt>PersistentQueue</tt> for every concrete
* Implementation of <tt>PersistentQueue</tt> for every concrete
* storage will have the same workflow. This abstracts the workflow.
* <p/>
* Enqueue is simpler, we just have to record the operation in a local
@ -319,7 +334,7 @@ trait PersistentRef[T] extends Transactional with Committable {
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
with Transactional with Committable with Logging {
with Transactional with Committable with Abortable with Logging {
sealed trait QueueOp
case object ENQ extends QueueOp
@ -356,8 +371,17 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
enqueuedNDequeuedEntries.clear
localQ.swap(Queue.empty)
pickMeForDQ.swap(0)
shouldClearOnCommit.swap(false)
}
def abort = {
enqueuedNDequeuedEntries.clear
shouldClearOnCommit.swap(false)
localQ.swap(Queue.empty)
pickMeForDQ.swap(0)
}
override def enqueue(elems: A*) {
register
elems.foreach(e => {
@ -382,19 +406,17 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
val (a, q) = localQ.get.get.dequeue
localQ.swap(q)
a
}
else
throw new NoSuchElementException("trying to dequeue from empty queue")
} else throw new NoSuchElementException("trying to dequeue from empty queue")
}
}
override def clear = {
override def clear = {
register
shouldClearOnCommit.swap(true)
localQ.swap(Queue.empty)
pickMeForDQ.swap(0)
}
override def size: Int = try {
storage.size(uuid) + localQ.get.get.length
} catch { case e: Exception => 0 }
@ -402,11 +424,11 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
override def isEmpty: Boolean =
size == 0
override def +=(elem: A) = {
override def +=(elem: A) = {
enqueue(elem)
this
}
def ++=(elems: Iterator[A]) = {
def ++=(elems: Iterator[A]) = {
enqueue(elems.toList: _*)
this
}
@ -428,7 +450,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
* Implements a template for a concrete persistent transactional sorted set based storage.
* <p/>
* Sorting is done based on a <i>zscore</i>. But the computation of zscore has been kept
* outside the abstraction.
* outside the abstraction.
* <p/>
* zscore can be implemented in a variety of ways by the calling class:
* <pre>
@ -445,7 +467,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
* class Foo {
* //..
* }
*
*
* implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
* def toZScore = {
* //..
@ -457,9 +479,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
*
* @author <a href="http://debasishg.blogspot.com"</a>
*/
trait PersistentSortedSet[A]
extends Transactional
with Committable {
trait PersistentSortedSet[A] extends Transactional with Committable with Abortable {
protected val newElems = TransactionalState.newMap[A, Float]
protected val removedElems = TransactionalState.newVector[A]
@ -473,6 +493,11 @@ trait PersistentSortedSet[A]
removedElems.clear
}
def abort = {
newElems.clear
removedElems.clear
}
def +(elem: A, score: Float) = add(elem, score)
def add(elem: A, score: Float) = {
@ -501,7 +526,7 @@ trait PersistentSortedSet[A]
}
}
}
def size: Int = newElems.size + storage.zcard(uuid) - removedElems.size
def zscore(elem: A): Float = {
@ -516,9 +541,9 @@ trait PersistentSortedSet[A]
implicit def order(x: (A, Float)) = new Ordered[(A, Float)] {
def compare(that: (A, Float)) = x._2 compare that._2
}
implicit def ordering = new scala.math.Ordering[(A,Float)] {
def compare(x: (A, Float),y : (A,Float)) = x._2 compare y._2
def compare(x: (A, Float),y : (A,Float)) = x._2 compare y._2
}
@ -531,7 +556,7 @@ trait PersistentSortedSet[A]
// -1 means the last element, -2 means the second last
val s = if (start < 0) start + l else start
val e =
val e =
if (end < 0) end + l
else if (end >= l) (l - 1)
else end

View file

@ -26,7 +26,7 @@ trait VectorStorageBackend[T] extends StorageBackend {
def updateVectorStorageEntryFor(name: String, index: Int, elem: T)
def getVectorStorageEntryFor(name: String, index: Int): T
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[T]
def getVectorStorageSizeFor(name: String): Int
def getVectorStorageSizeFor(name: String): Int
}
// for Ref
@ -47,17 +47,17 @@ trait RefStorageBackend[T] extends StorageBackend {
trait QueueStorageBackend[T] extends StorageBackend {
// add to the end of the queue
def enqueue(name: String, item: T): Boolean
// pop from the front of the queue
def dequeue(name: String): Option[T]
// get the size of the queue
def size(name: String): Int
// return an array of items currently stored in the queue
// start is the item to begin, count is how many items to return
def peek(name: String, start: Int, count: Int): List[T]
// completely delete the queue
def remove(name: String): Boolean
}
@ -65,19 +65,19 @@ trait QueueStorageBackend[T] extends StorageBackend {
trait SortedSetStorageBackend[T] extends StorageBackend {
// add item to sorted set identified by name
def zadd(name: String, zscore: String, item: T): Boolean
// remove item from sorted set identified by name
def zrem(name: String, item: T): Boolean
// cardinality of the set identified by name
def zcard(name: String): Int
// zscore of the item from sorted set identified by name
def zscore(name: String, item: T): Option[Float]
// zrange from the sorted set identified by name
def zrange(name: String, start: Int, end: Int): List[T]
// zrange with score from the sorted set identified by name
def zrangeWithScore(name: String, start: Int, end: Int): List[(T, Float)]
def zrangeWithScore(name: String, start: Int, end: Int): List[(T, Float)]
}

View file

@ -280,7 +280,7 @@ private[akka] object MongoStorageBackend extends
}
val currentList = dbobj.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
currentList.set(index, serializer.out(elem))
coll.update(q,
coll.update(q,
new BasicDBObject().append(KEY, name).append(VALUE, currentList))
}

View file

@ -1,10 +1,12 @@
package se.scalablesolutions.akka.persistence.redis
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before}
import org.junit.Assert._
import se.scalablesolutions.akka.actor.{Actor, ActorRef, Transactor}
import Actor._
import se.scalablesolutions.akka.actor.{ActorRef, Transactor}
import se.scalablesolutions.akka.actor.Actor._
/**
* A persistent actor based on Redis storage.
@ -23,10 +25,10 @@ case class Balance(accountNo: String)
case class Debit(accountNo: String, amount: BigInt, failer: ActorRef)
case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorRef)
case class Credit(accountNo: String, amount: BigInt)
case class Log(start: Int, finish: Int)
case object LogSize
class AccountActor extends Transactor {
import self._
private lazy val accountState = RedisStorage.newMap
private lazy val txnLog = RedisStorage.newVector
//timeout = 5000
@ -35,7 +37,7 @@ class AccountActor extends Transactor {
// check balance
case Balance(accountNo) =>
txnLog.add("Balance:%s".format(accountNo).getBytes)
self.reply(BigInt(new String(accountState.get(accountNo.getBytes).get)))
reply(BigInt(new String(accountState.get(accountNo.getBytes).get)))
// debit amount: can fail
case Debit(accountNo, amount, failer) =>
@ -49,7 +51,7 @@ class AccountActor extends Transactor {
accountState.put(accountNo.getBytes, (m - amount).toString.getBytes)
if (amount > m)
failer !! "Failure"
else self.reply(m - amount)
reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
@ -67,7 +69,7 @@ class AccountActor extends Transactor {
accountState.put(accountNo.getBytes, (m - bal).toString.getBytes)
}
if (bal > m) failer !! "Failure"
self.reply(m - bal)
reply(m - bal)
// credit amount
case Credit(accountNo, amount) =>
@ -79,13 +81,10 @@ class AccountActor extends Transactor {
case None => 0
}
accountState.put(accountNo.getBytes, (m + amount).toString.getBytes)
self.reply(m + amount)
reply(m + amount)
case LogSize =>
self.reply(txnLog.length.asInstanceOf[AnyRef])
case Log(start, finish) =>
self.reply(txnLog.slice(start, finish))
reply(txnLog.length.asInstanceOf[AnyRef])
}
}
@ -97,62 +96,35 @@ class AccountActor extends Transactor {
}
}
import org.scalatest.junit.JUnitSuite
class RedisPersistentActorSpec extends JUnitSuite {
@Test
def testSuccessfulDebit {
def testSuccessfulDebit = {
val bactor = actorOf[AccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
failer.start
bactor !! Credit("a-123", 5000)
bactor !! Debit("a-123", 3000, failer)
assertEquals(BigInt(2000), (bactor !! Balance("a-123")).get)
val acc = "a-123"
bactor !! Credit("a-123", 7000)
assertEquals(BigInt(9000), (bactor !! Balance("a-123")).get)
println("----------- SIZE 0 " + (bactor !! LogSize).get)
bactor !! Debit("a-123", 8000, failer)
assertEquals(BigInt(1000), (bactor !! Balance("a-123")).get)
bactor !! Credit(acc, 5000)
println("----------- SIZE 1 " + (bactor !! LogSize).get)
println(bactor !! Balance(acc))
println("----------- SIZE 2 " + (bactor !! LogSize).get)
bactor !! Debit(acc, 3000, failer)
println("----------- SIZE 3 " + (bactor !! LogSize).get)
assertEquals(BigInt(2000), (bactor !! Balance(acc)).get)
println("----------- SIZE 4 " + (bactor !! LogSize).get)
bactor !! Credit(acc, 7000)
println("----------- SIZE 5 " + (bactor !! LogSize).get)
assertEquals(BigInt(9000), (bactor !! Balance(acc)).get)
println("----------- SIZE 6 " + (bactor !! LogSize).get)
bactor !! Debit(acc, 8000, failer)
println("----------- SIZE 7 " + (bactor !! LogSize).get)
assertEquals(BigInt(1000), (bactor !! Balance(acc)).get)
println("----------- SIZE 8 " + (bactor !! LogSize).get)
assert(7 === (bactor !! LogSize).get) // Not counting the failed transaction => 7
import scala.collection.mutable.ArrayBuffer
assert((bactor !! Log(0, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 7)
assert((bactor !! Log(0, 0)).get.asInstanceOf[ArrayBuffer[String]].size == 0)
assert((bactor !! Log(1, 2)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
assert((bactor !! Log(6, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
assert((bactor !! Log(0, 1)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
val c: Int = (bactor !! LogSize).get
assertTrue(7 == c)
}
/**
@Test
def testUnsuccessfulDebit {
val bactor = actorOf(new AccountActor)
def testUnsuccessfulDebit = {
val bactor = actorOf[AccountActor]
bactor.start
bactor !! Credit("a-123", 5000)
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
val failer = actorOf(new PersistentFailerActor)
val failer = actorOf[PersistentFailerActor]
failer.start
try {
bactor !! Debit("a-123", 7000, failer)
@ -162,19 +134,19 @@ class RedisPersistentActorSpec extends JUnitSuite {
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
// should not count the failed one
// val c: Int = (bactor !! LogSize).get
// assertTrue(3 == c)
val c: Int = (bactor !! LogSize).get
assertTrue(3 == c)
}
@Test
def testUnsuccessfulMultiDebit {
val bactor = actorOf(new AccountActor)
def testUnsuccessfulMultiDebit = {
val bactor = actorOf[AccountActor]
bactor.start
bactor !! Credit("a-123", 5000)
assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
val failer = actorOf(new PersistentFailerActor)
val failer = actorOf[PersistentFailerActor]
failer.start
try {
bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)
@ -184,8 +156,7 @@ class RedisPersistentActorSpec extends JUnitSuite {
assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
// should not count the failed one
// val c: Int = (bactor !! LogSize).get
// assertTrue(3 == c)
val c: Int = (bactor !! LogSize).get
assertTrue(3 == c)
}
**/
}

View file

@ -87,7 +87,7 @@ object World {
pingEvery(evaporator, EvapMillis)
}
private def pingEvery(actor: ActorRef, millis: Long) =
private def pingEvery(actor: ActorRef, millis: Long) =
Scheduler.schedule(actor, "ping", Config.StartDelay, millis, TimeUnit.MILLISECONDS)
}

View file

@ -26,4 +26,4 @@ object Application1 {
println(actor2 !! Message("actor2"))
}
}
}

View file

@ -19,4 +19,4 @@ object Application2 {
RemoteNode.start("localhost", 7777)
RemoteNode.register("remote2", actorOf[RemoteActor2].start)
}
}
}

View file

@ -77,4 +77,4 @@ class CustomRouteBuilder extends RouteBuilder {
}
})
}
}
}

View file

@ -27,17 +27,17 @@ First we need to download, build and start up Redis:
4. Run: ./redis-server.
For details on how to set up Redis server have a look at http://code.google.com/p/redis/wiki/QuickStart.
Then to run the sample:
Then to run the sample:
1. Fire up two shells. For each of them:
- Step down into to the root of the Akka distribution.
- Set 'export AKKA_HOME=<root of distribution>.
- Run 'sbt console' to start up a REPL (interpreter).
2. In the first REPL you get execute:
2. In the first REPL you get execute:
- scala> import sample.chat._
- scala> import se.scalablesolutions.akka.actor.Actor._
- scala> val chatService = actorOf[ChatService].start
3. In the second REPL you get execute:
3. In the second REPL you get execute:
- scala> import sample.chat._
- scala> Runner.run
4. See the chat simulation run.
@ -60,12 +60,12 @@ case class ChatMessage(from: String, message: String) extends Event
/**
* Chat client.
*/
class ChatClient(val name: String) {
class ChatClient(val name: String) {
val chat = RemoteClient.actorFor("chat:service", "localhost", 9999)
def login = chat ! Login(name)
def logout = chat ! Logout(name)
def post(message: String) = chat ! ChatMessage(name, name + ": " + message)
def login = chat ! Login(name)
def logout = chat ! Logout(name)
def post(message: String) = chat ! ChatMessage(name, name + ": " + message)
def chatLog: ChatLog = (chat !! GetChatLog(name)).getOrElse(throw new Exception("Couldn't get the chat log from ChatServer"))
}
@ -75,15 +75,15 @@ class ChatClient(val name: String) {
class Session(user: String, storage: ActorRef) extends Actor {
private val loginTime = System.currentTimeMillis
private var userLog: List[String] = Nil
log.info("New session for user [%s] has been created at [%s]", user, loginTime)
def receive = {
case msg @ ChatMessage(from, message) =>
case msg @ ChatMessage(from, message) =>
userLog ::= message
storage ! msg
case msg @ GetChatLog(_) =>
case msg @ GetChatLog(_) =>
storage forward msg
}
}
@ -97,24 +97,24 @@ trait ChatStorage extends Actor
* Redis-backed chat storage implementation.
*/
class RedisChatStorage extends ChatStorage {
self.lifeCycle = Some(LifeCycle(Permanent))
self.lifeCycle = Some(LifeCycle(Permanent))
val CHAT_LOG = "akka.chat.log"
private var chatLog = atomic { RedisStorage.getVector(CHAT_LOG) }
log.info("Redis-based chat storage is starting up...")
def receive = {
case msg @ ChatMessage(from, message) =>
case msg @ ChatMessage(from, message) =>
log.debug("New chat message [%s]", message)
atomic { chatLog + message.getBytes("UTF-8") }
case GetChatLog(_) =>
case GetChatLog(_) =>
val messageList = atomic { chatLog.map(bytes => new String(bytes, "UTF-8")).toList }
self.reply(ChatLog(messageList))
}
override def postRestart(reason: Throwable) = chatLog = RedisStorage.getVector(CHAT_LOG)
override def postRestart(reason: Throwable) = chatLog = RedisStorage.getVector(CHAT_LOG)
}
/**
@ -122,27 +122,27 @@ class RedisChatStorage extends ChatStorage {
* <p/>
* Uses self-type annotation (this: Actor =>) to declare that it needs to be mixed in with an Actor.
*/
trait SessionManagement { this: Actor =>
trait SessionManagement { this: Actor =>
val storage: ActorRef // needs someone to provide the ChatStorage
val sessions = new HashMap[String, ActorRef]
protected def sessionManagement: Receive = {
case Login(username) =>
case Login(username) =>
log.info("User [%s] has logged in", username)
val session = actorOf(new Session(username, storage))
session.start
sessions += (username -> session)
case Logout(username) =>
case Logout(username) =>
log.info("User [%s] has logged out", username)
val session = sessions(username)
session.stop
sessions -= username
}
protected def shutdownSessions =
sessions.foreach { case (_, session) => session.stop }
sessions -= username
}
protected def shutdownSessions =
sessions.foreach { case (_, session) => session.stop }
}
/**
@ -152,7 +152,7 @@ trait SessionManagement { this: Actor =>
*/
trait ChatManagement { this: Actor =>
val sessions: HashMap[String, ActorRef] // needs someone to provide the Session map
protected def chatManagement: Receive = {
case msg @ ChatMessage(from, _) => sessions(from) ! msg
case msg @ GetChatLog(from) => sessions(from) forward msg
@ -172,20 +172,20 @@ trait RedisChatStorageFactory { this: Actor =>
trait ChatServer extends Actor {
self.faultHandler = Some(OneForOneStrategy(5, 5000))
self.trapExit = List(classOf[Exception])
val storage: ActorRef
log.info("Chat server is starting up...")
// actor message handler
def receive = sessionManagement orElse chatManagement
// abstract methods to be defined somewhere else
protected def chatManagement: Receive
protected def sessionManagement: Receive
protected def sessionManagement: Receive
protected def shutdownSessions: Unit
override def shutdown = {
override def shutdown = {
log.info("Chat server is shutting down...")
shutdownSessions
self.unlink(storage)
@ -200,10 +200,10 @@ trait ChatServer extends Actor {
* val chatService = Actor.actorOf[ChatService].start
* </pre>
*/
class ChatService extends
ChatServer with
SessionManagement with
ChatManagement with
class ChatService extends
ChatServer with
SessionManagement with
ChatManagement with
RedisChatStorageFactory {
override def init = {
RemoteNode.start("localhost", 9999)
@ -217,7 +217,7 @@ class ChatService extends
object Runner {
def run = {
val client = new ChatClient("jonas")
client.login
client.post("Hi there")
@ -228,4 +228,4 @@ object Runner {
client.logout
}
}
}

View file

@ -23,7 +23,7 @@ class Boot extends Logging {
def boot {
// where to search snippet
LiftRules.addToPackages("sample.lift")
LiftRules.httpAuthProtectedResource.prepend {
case (Req("liftcount" :: Nil, _, _)) => Full(AuthRole("admin"))
}
@ -35,9 +35,9 @@ class Boot extends Logging {
true
}
}
LiftRules.passNotFoundToChain = true
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
@ -49,7 +49,7 @@ class Boot extends Logging {
LifeCycle(Permanent)) ::
Nil))
factory.newInstance.start
// Build SiteMap
// val entries = Menu(Loc("Home", List("index"), "Home")) :: Nil
// LiftRules.setSiteMap(SiteMap(entries:_*))

View file

@ -13,4 +13,4 @@ object LiftConsole {
exit(0)
}
}
*/
*/

View file

@ -10,7 +10,7 @@ import se.scalablesolutions.akka.actor.Actor._
/**
* Sample Akka application for Redis PubSub
*
*
* Prerequisite: Need Redis Server running (the version that supports pubsub)
* <pre>
* 1. Download redis from http://github.com/antirez/redis
@ -65,7 +65,7 @@ object Sub {
val r = new RedisClient("localhost", 6379)
val s = actorOf(new Subscriber(r))
s.start
s ! Register(callback)
s ! Register(callback)
def sub(channels: String*) = {
s ! Subscribe(channels.toArray)
@ -78,29 +78,29 @@ object Sub {
def callback(pubsub: PubSubMessage) = pubsub match {
case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>
case "exit" =>
println("unsubscribe all ..")
r.unsubscribe
// message "+x" will subscribe to channel x
case x if x startsWith "+" =>
case x if x startsWith "+" =>
val s: Seq[Char] = x
s match {
case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
}
// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>
case x if x startsWith "-" =>
val s: Seq[Char] = x
s match {
case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
}
// other message receive
case x =>
case x =>
println("received message on channel " + channel + " as : " + x)
}
}

View file

@ -11,7 +11,7 @@ import se.scalablesolutions.akka.util.Logging
class RemoteHelloWorldActor extends RemoteActor("localhost", 9999) {
def receive = {
case "Hello" =>
case "Hello" =>
log.info("Received 'Hello'")
self.reply("World")
}
@ -27,7 +27,7 @@ object ClientManagedRemoteActorServer extends Logging {
}
object ClientManagedRemoteActorClient extends Logging {
def run = {
val actor = actorOf[RemoteHelloWorldActor].start
log.info("Remote actor created, moved to the server")

View file

@ -11,7 +11,7 @@ import se.scalablesolutions.akka.util.Logging
class HelloWorldActor extends Actor {
def receive = {
case "Hello" =>
case "Hello" =>
log.info("Received 'Hello'")
self.reply("World")
}
@ -30,7 +30,7 @@ object ServerManagedRemoteActorServer extends Logging {
}
object ServerManagedRemoteActorClient extends Logging {
def run = {
val actor = RemoteClient.actorFor("hello-service", "localhost", 9999)
log.info("Remote client created")

View file

@ -1,13 +1,16 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package sample.rest.java;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
public class Boot {
final private ActiveObjectConfigurator manager = new ActiveObjectConfigurator();
public Boot() throws Exception {
manager.configure(
public final static ActiveObjectConfigurator configurator = new ActiveObjectConfigurator();
static {
configurator.configure(
new RestartStrategy(new OneForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[] {
new Component(
@ -19,5 +22,5 @@ public class Boot {
new LifeCycle(new Permanent()),
1000)
}).supervise();
}
}
}

View file

@ -4,10 +4,6 @@
package sample.rest.java;
import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.actor.annotation.prerestart;
import se.scalablesolutions.akka.actor.annotation.postrestart;
@ -16,14 +12,6 @@ import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage;
import java.nio.ByteBuffer;
/**
* Try service out by invoking (multiple times):
* <pre>
* curl http://localhost:9998/persistentjavacount
* </pre>
* Or browse to the URL from a web browser.
*/
@Path("/persistentjavacount")
@transactionrequired
public class PersistentSimpleService {
private String KEY = "COUNTER";
@ -31,8 +19,6 @@ public class PersistentSimpleService {
private boolean hasStartedTicking = false;
private PersistentMap<byte[], byte[]> storage;
@GET
@Produces({"application/html"})
public String count() {
if (storage == null) storage = CassandraStorage.newMap();
if (!hasStartedTicking) {

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package sample.rest.java;
import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
/**
* Try service out by invoking (multiple times):
* <pre>
* curl http://localhost:9998/persistentjavacount
* </pre>
* Or browse to the URL from a web browser.
*/
@Path("/persistentjavacount")
public class PersistentSimpleServiceRest {
private PersistentSimpleService service = (PersistentSimpleService) Boot.configurator.getInstance(PersistentSimpleService.class);
@GET
@Produces({"application/json"})
public String count() {
return service.count();
}
}

View file

@ -4,10 +4,6 @@
package sample.rest.java;
import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
import se.scalablesolutions.akka.actor.ActiveObject;
import se.scalablesolutions.akka.actor.ActiveObjectContext;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
@ -16,14 +12,6 @@ import se.scalablesolutions.akka.actor.annotation.postrestart;
import se.scalablesolutions.akka.stm.TransactionalState;
import se.scalablesolutions.akka.stm.TransactionalMap;
/**
* Try service out by invoking (multiple times):
* <pre>
* curl http://localhost:9998/javacount
* </pre>
* Or browse to the URL from a web browser.
*/
@Path("/javacount")
@transactionrequired
public class SimpleService {
private String KEY = "COUNTER";
@ -32,8 +20,6 @@ public class SimpleService {
private TransactionalMap<String, Integer> storage;
private Receiver receiver = ActiveObject.newInstance(Receiver.class);
@GET
@Produces({"application/json"})
public String count() {
if (storage == null) storage = TransactionalState.newMap();
if (!hasStartedTicking) {

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package sample.rest.java;
import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
/**
* Try service out by invoking (multiple times):
* <pre>
* curl http://localhost:9998/javacount
* </pre>
* Or browse to the URL from a web browser.
*/
@Path("/javacount")
public class SimpleServiceRest {
private SimpleService service = (SimpleService) Boot.configurator.getInstance(SimpleService.class);
@GET
@Produces({"application/json"})
public String count() {
return service.count();
}
}

View file

@ -51,12 +51,12 @@ class SimpleService {
@GET
@Produces(Array("text/html"))
def count = {
//Fetch the first actor of type SimpleServiceActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption
r <- a.!![NodeSeq]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result getOrElse <error>Error in counter</error>
//Fetch the first actor of type SimpleServiceActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption
r <- a.!![NodeSeq]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result getOrElse <error>Error in counter</error>
}
}
class SimpleServiceActor extends Transactor {
@ -105,12 +105,12 @@ class PersistentSimpleService {
@GET
@Produces(Array("text/html"))
def count = {
//Fetch the first actor of type PersistentSimpleServiceActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption
r <- a.!![NodeSeq]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result getOrElse <error>Error in counter</error>
//Fetch the first actor of type PersistentSimpleServiceActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption
r <- a.!![NodeSeq]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result getOrElse <error>Error in counter</error>
}
}
@ -147,18 +147,18 @@ class Chat {
@Consumes(Array("application/x-www-form-urlencoded"))
@Produces(Array("text/html"))
def publishMessage(form: MultivaluedMap[String, String]) = {
val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message"))
val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message"))
//Fetch the first actor of type ChatActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[ChatActor]).headOption
r <- a.!![String](msg)} yield r
//Return either the resulting String or a default one
result getOrElse "System__error"
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[ChatActor]).headOption
r <- a.!![String](msg)} yield r
//Return either the resulting String or a default one
result getOrElse "System__error"
}
}
object ChatActor {
case class ChatMsg(val who: String, val what: String, val msg: String)
case class ChatMsg(val who: String, val what: String, val msg: String)
}
class ChatActor extends Actor with Logging {

View file

@ -120,12 +120,12 @@ class SecureTickService {
def paranoiaTick = tick
def tick = {
//Fetch the first actor of type PersistentSimpleServiceActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[SecureTickActor]).headOption
r <- a.!![Integer]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result match {
//Fetch the first actor of type PersistentSimpleServiceActor
//Send it the "Tick" message and expect a NdeSeq back
val result = for{a <- actorsFor(classOf[SecureTickActor]).headOption
r <- a.!![Integer]("Tick")} yield r
//Return either the resulting NodeSeq or a default one
result match {
case (Some(counter)) => (<success>Tick: {counter}</success>)
case _ => (<error>Error in counter</error>)
}
@ -147,4 +147,4 @@ class SecureTickActor extends Transactor with Logging {
self.reply(new Integer(0))
}
}
}
}

View file

@ -1,26 +1,26 @@
####################
# Akka Config File #
####################
# This file has all the default settings, so all these could be removed with no visible effect.
# Modify as needed.
<log>
filename = "./logs/akka.log"
roll = "daily" # Options: never, hourly, daily, sunday/monday/...
level = "DEBUG" # Options: fatal, critical, error, warning, info, debug, trace
level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
console = on
# syslog_host = ""
# syslog_server_name = ""
</log>
<akka>
version = "0.9"
# FQN (Fully Qualified Name) to the class doing initial active object/actor
# supervisor bootstrap, should be defined in default constructor
boot = ["sample.camel.Boot",
"sample.rest.java.Boot",
"sample.rest.java.Boot",
"sample.rest.scala.Boot",
"sample.security.Boot"]
@ -29,9 +29,9 @@
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
</actor>
<dispatch>
<dispatcher>
throughput = 5 # default throughput for ExecutorBasedEventDrivenDispatcher
</dispatch>
</dispatcher>
<stm>
service = on
@ -41,10 +41,10 @@
jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will
# begin (or join), commit or rollback the JTA transaction. Default is 'off'.
</stm>
<jta>
provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI)
# "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta',
provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI)
# "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta',
# e.g. you need the akka-jta JARs on classpath).
timeout = 60000
</jta>
@ -56,7 +56,7 @@
filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
resource_packages = ["sample.rest.scala","sample.rest.java","sample.security"] # List with all resource packages for your Jersey services
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now)
#IF you are using a KerberosAuthenticationActor
# <kerberos>
# servicePrincipal = "HTTP/localhost@EXAMPLE.COM"
@ -73,11 +73,10 @@
<cluster>
service = on
name = "default" # The name of the cluster
actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor" # FQN of an implementation of ClusterActor
serializer = "se.scalablesolutions.akka.serialization.Serializer$Java$" # FQN of the serializer class
</cluster>
<server>
<server>
service = on
hostname = "localhost"
port = 9999
@ -89,14 +88,14 @@
read-timeout = 10000 # in millis (10 sec default)
</client>
</remote>
<storage>
<cassandra>
hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds
port = 9160
consistency-level = "QUORUM" # Options: ZERO, ONE, QUORUM, DCQUORUM, DCQUORUMSYNC, ALL, ANY
</cassandra>
<mongodb>
hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance
port = 27017

View file

@ -21,15 +21,15 @@
<!-- Basic Configuration -->
<!--======================================================================-->
<!--
<!--
~ The name of this cluster. This is mainly used to prevent machines in
~ one logical cluster from joining another.
-->
<ClusterName>akka</ClusterName>
<!--
~ Turn on to make new [non-seed] nodes automatically migrate the right data
~ to themselves. (If no InitialToken is specified, they will pick one
~ Turn on to make new [non-seed] nodes automatically migrate the right data
~ to themselves. (If no InitialToken is specified, they will pick one
~ such that they will get half the range of the most-loaded node.)
~ If a node starts up without bootstrapping, it will mark itself bootstrapped
~ so that you can't subsequently accidently bootstrap a node with
@ -66,11 +66,11 @@
~ and LongType. You can also specify the fully-qualified class
~ name to a class of your choice extending
~ org.apache.cassandra.db.marshal.AbstractType.
~
~
~ SuperColumns have a similar CompareSubcolumnsWith attribute.
~
~
~ BytesType: Simple sort by byte value. No validation is performed.
~ AsciiType: Like BytesType, but validates that the input can be
~ AsciiType: Like BytesType, but validates that the input can be
~ parsed as US-ASCII.
~ UTF8Type: A string encoded as UTF8
~ LongType: A 64bit long
@ -82,7 +82,7 @@
~
~ An optional `Comment` attribute may be used to attach additional
~ human-readable information about the column family to its definition.
~
~
~ The optional KeysCached attribute specifies
~ the number of keys per sstable whose locations we keep in
~ memory in "mostly LRU" order. (JUST the key locations, NOT any
@ -94,25 +94,25 @@
~ whose entire contents we cache in memory. Do not use this on
~ ColumnFamilies with large rows, or ColumnFamilies with high write:read
~ ratios. Specify a fraction (value less than 1), a percentage (ending in
~ a % sign) or an absolute number of rows to cache.
~ a % sign) or an absolute number of rows to cache.
~ RowsCached defaults to 0, i.e., row cache is off by default.
~
~ Remember, when using caches as a percentage, they WILL grow with
~ your data set!
-->
<ColumnFamily Name="map"
CompareWith="UTF8Type"
<ColumnFamily Name="map"
CompareWith="UTF8Type"
KeysCached="100%" />
<!-- FIXME: change vector to a super column -->
<ColumnFamily Name="vector"
CompareWith="UTF8Type"
<ColumnFamily Name="vector"
CompareWith="UTF8Type"
KeysCached="100%" />
<ColumnFamily Name="ref"
CompareWith="UTF8Type"
<ColumnFamily Name="ref"
CompareWith="UTF8Type"
KeysCached="100%" />
<!--ColumnFamily Name="Standard1" CompareWith="BytesType"/>
<ColumnFamily Name="Standard2"
<ColumnFamily Name="Standard2"
CompareWith="UTF8Type"
KeysCached="100%"/>
<ColumnFamily Name="StandardByUUID1" CompareWith="TimeUUIDType" />
@ -150,7 +150,7 @@
~ and PropertyFileEndPointSnitch is available in contrib/.
-->
<EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
</Keyspace>
</Keyspaces>
@ -158,7 +158,7 @@
~ Authenticator: any IAuthenticator may be used, including your own as long
~ as it is on the classpath. Out of the box, Cassandra provides
~ org.apache.cassandra.auth.AllowAllAuthenticator and,
~ org.apache.cassandra.auth.SimpleAuthenticator
~ org.apache.cassandra.auth.SimpleAuthenticator
~ (SimpleAuthenticator uses access.properties and passwd.properties by
~ default).
~
@ -188,7 +188,7 @@
~ are sent to the node with the "closest" token, so distributing your
~ tokens equally along the key distribution space will spread keys
~ evenly across your cluster.) This setting is only checked the first
~ time a node is started.
~ time a node is started.
~ This can also be useful with RandomPartitioner to force equal spacing
~ of tokens around the hash space, especially for clusters with a small
@ -227,9 +227,9 @@
<!-- Local hosts and ports -->
<!--
<!--
~ Address to bind to and tell other nodes to connect to. You _must_
~ change this if you want multiple nodes to be able to communicate!
~ change this if you want multiple nodes to be able to communicate!
~
~ Leaving it blank leaves it up to InetAddress.getLocalHost(). This
~ will always do the Right Thing *if* the node is properly configured
@ -251,9 +251,9 @@
<ThriftAddress>localhost</ThriftAddress>
<!-- Thrift RPC port (the port clients connect to). -->
<ThriftPort>9160</ThriftPort>
<!--
<!--
~ Whether or not to use a framed transport for Thrift. If this option
~ is set to true then you must also use a framed transport on the
~ is set to true then you must also use a framed transport on the
~ client-side, (framed and non-framed transports are not compatible).
-->
<ThriftFramedTransport>false</ThriftFramedTransport>
@ -285,16 +285,16 @@
<!--
~ Buffer size to use when performing contiguous column slices. Increase
~ this to the size of the column slices you typically perform.
~ (Name-based queries are performed with a buffer size of
~ this to the size of the column slices you typically perform.
~ (Name-based queries are performed with a buffer size of
~ ColumnIndexSizeInKB.)
-->
<SlicedBufferSizeInKB>64</SlicedBufferSizeInKB>
<!--
~ Buffer size to use when flushing memtables to disk. (Only one
~ Buffer size to use when flushing memtables to disk. (Only one
~ memtable is ever flushed at a time.) Increase (decrease) the index
~ buffer size relative to the data buffer if you have few (many)
~ buffer size relative to the data buffer if you have few (many)
~ columns per key. Bigger is only better _if_ your memtables get large
~ enough to use the space. (Check in your data directory after your
~ app has been running long enough.) -->
@ -314,7 +314,7 @@
<!--
~ Flush memtable after this much data has been inserted, including
~ overwritten data. There is one memtable per column family, and
~ overwritten data. There is one memtable per column family, and
~ this threshold is based solely on the amount of data stored, not
~ actual heap memory usage (there is some overhead in indexing the
~ columns).
@ -379,7 +379,7 @@
~ individually). Reasonable values range from a minimal 0.1 to 10 or
~ even more if throughput matters more than latency.
-->
<!-- <CommitLogSyncBatchWindowInMS>1</CommitLogSyncBatchWindowInMS> -->
<!-- <CommitLogSyncBatchWindowInMS>1</CommitLogSyncBatchWindowInMS> -->
<!--
~ Time to wait before garbage-collection deletion markers. Set this to

View file

@ -5,7 +5,7 @@
import sbt._
import sbt.CompileOrder._
import spde._
import java.util.jar.Attributes
import java.util.jar.Attributes.Name._
import java.io.File
@ -14,11 +14,11 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// ------------------------------------------------------------
// project versions
val JERSEY_VERSION = "1.1.5"
val ATMO_VERSION = "0.5.4"
val JERSEY_VERSION = "1.2"
val ATMO_VERSION = "0.6-SNAPSHOT"
val CASSANDRA_VERSION = "0.6.1"
val LIFT_VERSION = "2.0-scala280-SNAPSHOT"
val SCALATEST_VERSION = "1.2-for-scala-2.8.0.RC2-SNAPSHOT"
val SCALATEST_VERSION = "1.2-for-scala-2.8.0.RC3-SNAPSHOT"
val MULTIVERSE_VERSION = "0.5.2"
// ------------------------------------------------------------
@ -26,11 +26,11 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val distPath = info.projectPath / "dist"
override def compileOptions = super.compileOptions ++
Seq("-deprecation",
"-Xmigration",
"-Xcheckinit",
"-Xstrict-warnings",
"-Xwarninit",
Seq("-deprecation",
"-Xmigration",
"-Xcheckinit",
"-Xstrict-warnings",
"-Xwarninit",
"-encoding", "utf8")
.map(x => CompileOption(x))
@ -59,20 +59,13 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", sunjdmkRepo)
val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", sunjdmkRepo)
def javaNetRepo = "java.net Repo" at "http://download.java.net/maven/2"
def sonatypeSnapshotRepo = "Sonatype OSS Repo" at "http://oss.sonatype.org/content/repositories/snapshots"
val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", javaNetRepo)
val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", javaNetRepo)
val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", javaNetRepo)
val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", javaNetRepo)
val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", sonatypeSnapshotRepo)
val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
/* These are not needed and can possibly be deleted.
val databinder = "DataBinder" at "http://databinder.net/repo"
// val configgy = "Configgy" at "http://www.lag.net/repo"
val codehaus = "Codehaus" at "http://repository.codehaus.org"
val codehaus_snapshots = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org"
val google = "Google" at "http://google-maven-repository.googlecode.com/svn/repository"
*/
// ------------------------------------------------------------
// project defintions
lazy val akka_core = project("akka-core", "akka-core", new AkkaCoreProject(_))
@ -85,8 +78,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_),
akka_core, akka_http, akka_spring, akka_camel, akka_persistence, akka_amqp)
// functional tests in java
lazy val akka_fun_test = project("akka-fun-test-java", "akka-fun-test-java", new AkkaFunTestProject(_), akka_kernel)
// active object tests in java
lazy val akka_active_object_test = project("akka-active-object-test", "akka-active-object-test", new AkkaActiveObjectTestProject(_), akka_kernel)
// examples
lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_))
@ -107,6 +100,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// create a manifest with all akka jars and dependency jars on classpath
override def manifestClassPath = Some(allArtifacts.getFiles
.filter(_.getName.endsWith(".jar"))
.filter(!_.getName.contains("servlet_2.4"))
.filter(!_.getName.contains("scala-library"))
.map("lib_managed/scala_%s/compile/".format(buildScalaVersion) + _.getName)
.mkString(" ") +
@ -157,21 +151,21 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
<distribution>repo</distribution>
</license>
</licenses>
// publish to local mvn
import Process._
lazy val publishLocalMvn = runMvnInstall
lazy val publishLocalMvn = runMvnInstall
def runMvnInstall = task {
for(absPath <- akkaArtifacts.getPaths) {
for (absPath <- akkaArtifacts.getPaths) {
val artifactRE = """(.*)/dist/(.*)-(.*).jar""".r
val artifactRE(path, artifactId, artifactVersion) = absPath
val command = "mvn install:install-file" +
val artifactRE(path, artifactId, artifactVersion) = absPath
val command = "mvn install:install-file" +
" -Dfile=" + absPath +
" -DgroupId=se.scalablesolutions.akka" +
" -DartifactId=" + artifactId +
" -DgroupId=se.scalablesolutions.akka" +
" -DartifactId=" + artifactId +
" -Dversion=" + version +
" -Dpackaging=jar -DgeneratePom=true"
command ! log
command ! log
}
None
} dependsOn(dist) describedAs("Run mvn install for artifacts in dist.")
@ -198,7 +192,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile"
val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile"
val jgroups = "jgroups" % "jgroups" % "2.9.0.GA" % "compile"
// testing
val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
val junit = "junit" % "junit" % "4.5" % "test"
@ -226,7 +220,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val annotation = "javax.annotation" % "jsr250-api" % "1.0" % "compile"
val lift_common = "net.liftweb" % "lift-common" % LIFT_VERSION % "compile"
val lift_util = "net.liftweb" % "lift-util" % LIFT_VERSION % "compile"
// testing
val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
val junit = "junit" % "junit" % "4.5" % "test"
@ -296,19 +290,12 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
// ================= TEST ==================
class AkkaFunTestProject(info: ProjectInfo) extends DefaultProject(info) {
val jackson_core_asl = "org.codehaus.jackson" % "jackson-core-asl" % "1.2.1" % "compile"
val stax_api = "javax.xml.stream" % "stax-api" % "1.0-2" % "compile"
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.2.0" % "compile"
val grizzly = "com.sun.grizzly" % "grizzly-comet-webserver" % "1.9.18-i" % "compile"
val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "compile"
val jersey_json = "com.sun.jersey" % "jersey-json" % JERSEY_VERSION % "compile"
val jersey_atom = "com.sun.jersey" % "jersey-atom" % JERSEY_VERSION % "compile"
class AkkaActiveObjectTestProject(info: ProjectInfo) extends DefaultProject(info) {
// testing
val junit = "junit" % "junit" % "4.5" % "test"
val jmock = "org.jmock" % "jmock" % "2.4.0" % "test"
}
// ================= EXAMPLES ==================
class AkkaSampleAntsProject(info: ProjectInfo) extends DefaultSpdeProject(info) {
val scalaToolsSnapshots = ScalaToolsSnapshots
@ -395,13 +382,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
!jar.toString.endsWith("scala-library-2.7.7.jar")
)
}
def akkaArtifacts = {
descendents(info.projectPath / "dist", "*" + buildScalaVersion + "-" + version + ".jar")
}
def akkaArtifacts = descendents(info.projectPath / "dist", "*" + buildScalaVersion + "-" + version + ".jar")
// ------------------------------------------------------------
class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) with DeployProject
trait DeployProject extends DefaultProject {
@ -411,7 +395,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val dist = distAction
def distAction = deployTask(jarPath, packageDocsJar, packageSrcJar, deployPath, true, true, true) dependsOn(
`package`, packageDocs, packageSrc) describedAs("Deploying")
def deployTask(jar: Path, docs: Path, src: Path, toDir: Path,
def deployTask(jar: Path, docs: Path, src: Path, toDir: Path,
genJar: Boolean, genDocs: Boolean, genSource: Boolean) = task {
gen(jar, toDir, genJar, "Deploying bits") orElse
gen(docs, toDir, genDocs, "Deploying docs") orElse

View file

@ -5,4 +5,4 @@ class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
val spdeSbt = "us.technically.spde" % "spde-sbt-plugin" % "0.4.1"
// val repo = "GH-pages repo" at "http://mpeltonen.github.com/maven/"
// val idea = "com.github.mpeltonen" % "sbt-idea-plugin" % "0.1-SNAPSHOT"
}
}

View file

@ -1,2 +1,2 @@
#!/bin/sh
sed -i '' 's/[[:space:]]*$//g' **/*.*
sed -i '' 's/[[:space:]]*$//g' **/*.scala

View file

@ -1,6 +1,6 @@
#!/bin/bash
cd $AKKA_HOME
VERSION=akka_2.8.0.Beta1-0.8
VERSION=akka_2.8.0.RC3-0.9
TARGET_DIR=dist/$VERSION/$1
shift 1
VMARGS=$@
@ -8,8 +8,8 @@ VMARGS=$@
if [ -d $TARGET_DIR ]; then
cd $TARGET_DIR
else
unzip dist/${VERSION}.zip -d $TARGET_DIR
cd $TARGET_DIR
unzip dist/${VERSION}.zip -d $TARGET_DIR
cd $TARGET_DIR
fi
export AKKA_HOME=`pwd`