Merge branch 'master' into Atmosphere0.5

This commit is contained in:
Viktor Klang 2009-12-09 20:57:29 +01:00
commit 53e85dbb8f
30 changed files with 918 additions and 855 deletions

View file

@ -17,11 +17,10 @@ import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.Helpers.ReadWriteLock
import se.scalablesolutions.akka.util.Logging
import org.codehaus.aspectwerkz.proxy.Uuid
import org.multiverse.api.ThreadLocalTransaction._
import se.scalablesolutions.akka.util.{HashCode, Logging}
/**
* Mix in this trait to give an actor TransactionRequired semantics.
@ -116,7 +115,7 @@ object Actor extends Logging {
*
*/
def actor[A](body: => Unit) = {
def handler[A](body: Unit) = new {
def handler[A](body: => Unit) = new {
def receive(handler: PartialFunction[Any, Unit]) = new Actor() {
start
body
@ -215,6 +214,7 @@ trait Actor extends TransactionManagement {
implicit protected val self: Actor = this
// FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait
// Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = Uuid.newUuid.toString
def uuid = _uuid
@ -471,7 +471,7 @@ trait Actor extends TransactionManagement {
* actor.send(message)
* </pre>
*/
def !(message: AnyRef)(implicit sender: AnyRef) = {
def !(message: Any)(implicit sender: AnyRef) = {
val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor])
else None
if (_isRunning) postMessageToMailbox(message, from)
@ -482,7 +482,7 @@ trait Actor extends TransactionManagement {
/**
* Same as the '!' method but does not take an implicit sender as second parameter.
*/
def send(message: AnyRef) = {
def send(message: Any) = {
if (_isRunning) postMessageToMailbox(message, None)
else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
@ -500,7 +500,7 @@ trait Actor extends TransactionManagement {
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !![T](message: AnyRef, timeout: Long): Option[T] = if (_isRunning) {
def !![T](message: Any, timeout: Long): Option[T] = if (_isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
val isActiveObject = message.isInstanceOf[Invocation]
if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None)
@ -527,19 +527,19 @@ trait Actor extends TransactionManagement {
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !![T](message: AnyRef): Option[T] = !![T](message, timeout)
def !![T](message: Any): Option[T] = !![T](message, timeout)
/**
* This method is evil and has been removed. Use '!!' with a timeout instead.
*/
def !?[T](message: AnyRef): T = throw new UnsupportedOperationException(
def !?[T](message: Any): T = throw new UnsupportedOperationException(
"'!?' is evil and has been removed. Use '!!' with a timeout instead")
/**
* Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
* being processed.
*/
protected[this] def reply(message: AnyRef) = {
protected[this] def reply(message: Any) = {
sender match {
case Some(senderActor) =>
senderActor ! message
@ -723,9 +723,9 @@ trait Actor extends TransactionManagement {
actor
}
// ================================
// ==== IMPLEMENTATION DETAILS ====
// ================================
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
private def spawnButDoNotStart[T <: Actor](actorClass: Class[T]): T = {
val actor = actorClass.newInstance.asInstanceOf[T]
@ -736,7 +736,7 @@ trait Actor extends TransactionManagement {
actor
}
private def postMessageToMailbox(message: AnyRef, sender: Option[Actor]): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
private def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@ -756,7 +756,7 @@ trait Actor extends TransactionManagement {
}
}
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long):
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Long):
CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
@ -848,7 +848,7 @@ trait Actor extends TransactionManagement {
} else proceed
} catch {
case e =>
Actor.log.error(e, "Could not invoke actor [%s]", this)
Actor.log.error(e, "Exception when invoking actor [%s] with message [%s]", this, message)
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
clearTransaction // need to clear currentTransaction before call to supervisor
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
@ -865,7 +865,7 @@ trait Actor extends TransactionManagement {
private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
private val lifeCycles: PartialFunction[Any, Unit] = {
case Init(config) => init(config)
case Init(config) => _config = Some(config); init(config)
case HotSwap(code) => _hotswap = code
case Restart(reason) => restart(reason)
case Exit(dead, reason) => handleTrapExit(dead, reason)
@ -897,8 +897,9 @@ trait Actor extends TransactionManagement {
case Permanent =>
actor.restart(reason)
case Temporary =>
Actor.log.info("Actor [%s] configured as TEMPORARY will not be restarted.", actor.id)
Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actor.id)
getLinkedActors.remove(actor) // remove the temporary actor
actor.stop
}
}
}
@ -958,5 +959,17 @@ trait Actor extends TransactionManagement {
} else message
} else message
override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, _uuid)
result
}
override def equals(that: Any): Boolean = {
that != null &&
that.isInstanceOf[Actor] &&
that.asInstanceOf[Actor]._uuid == _uuid
}
override def toString(): String = "Actor[" + id + ":" + uuid + "]"
}

View file

@ -13,7 +13,7 @@ import scala.collection.mutable.HashMap
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActorRegistry {
object ActorRegistry extends Logging {
private val actorsByClassName = new HashMap[String, List[Actor]]
private val actorsById = new HashMap[String, List[Actor]]
@ -48,4 +48,11 @@ object ActorRegistry {
actorsByClassName - actor.getClass.getName
actorsById - actor.getClass.getName
}
// TODO: document ActorRegistry.shutdownAll
def shutdownAll = {
log.info("Shutting down all actors in the system...")
actorsById.foreach(entry => entry._2.map(_.stop))
log.info("All actors have been shut down")
}
}

View file

@ -126,14 +126,13 @@ class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: B
val iterator = invocations.iterator
while (iterator.hasNext) {
val invocation = iterator.next
if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]")
if (concurrentMode) {
val invoker = messageHandlers.get(invocation.receiver)
if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]")
if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null")
result.put(invocation, invoker)
result.put(invocation, invoker)
} else if (!busyInvokers.contains(invocation.receiver)) {
val invoker = messageHandlers.get(invocation.receiver)
if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]")
if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null")
result.put(invocation, invoker)
busyInvokers.add(invocation.receiver)

View file

@ -18,12 +18,12 @@ sealed trait FutureResult {
def isCompleted: Boolean
def isExpired: Boolean
def timeoutInNanos: Long
def result: Option[AnyRef]
def result: Option[Any]
def exception: Option[Tuple2[AnyRef, Throwable]]
}
trait CompletableFutureResult extends FutureResult {
def completeWithResult(result: AnyRef)
def completeWithResult(result: Any)
def completeWithException(toBlame: AnyRef, exception: Throwable)
}
@ -36,7 +36,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
private val _lock = new ReentrantLock
private val _signal = _lock.newCondition
private var _completed: Boolean = _
private var _result: Option[AnyRef] = None
private var _result: Option[Any] = None
private var _exception: Option[Tuple2[AnyRef, Throwable]] = None
def await = try {
@ -79,7 +79,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
_lock.unlock
}
def result: Option[AnyRef] = try {
def result: Option[Any] = try {
_lock.lock
_result
} finally {
@ -93,7 +93,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
_lock.unlock
}
def completeWithResult(result: AnyRef) = try {
def completeWithResult(result: Any) = try {
_lock.lock
if (!_completed) {
_completed = true

View file

@ -38,7 +38,7 @@ trait MessageDemultiplexer {
}
class MessageInvocation(val receiver: Actor,
val message: AnyRef,
val message: Any,
val future: Option[CompletableFutureResult],
val sender: Option[Actor],
val tx: Option[Transaction]) {
@ -67,10 +67,12 @@ class MessageInvocation(val receiver: Actor,
}
override def toString(): String = synchronized {
"MessageInvocation[message = " + message +
", receiver = " + receiver +
", sender = " + sender +
", future = " + future +
", tx = " + tx + "]"
"MessageInvocation[" +
"\n\tmessage = " + message +
"\n\treceiver = " + receiver +
"\n\tsender = " + sender +
"\n\tfuture = " + future +
"\n\ttx = " + tx +
"\n]"
}
}

View file

@ -4,14 +4,14 @@
package se.scalablesolutions.akka.nio
import akka.serialization.Serializable.SBinary
import se.scalablesolutions.akka.serialization.Serializable.SBinary
import se.scalablesolutions.akka.serialization.{Serializer, Serializable, SerializationProtocol}
import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
import com.google.protobuf.{Message, ByteString}
import serialization.{Serializer, Serializable, SerializationProtocol}
import protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
object RemoteProtocolBuilder {
def getMessage(request: RemoteRequest): AnyRef = {
def getMessage(request: RemoteRequest): Any = {
request.getProtocol match {
case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
@ -26,13 +26,13 @@ object RemoteProtocolBuilder {
val messageClass = Serializer.Java.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
Serializer.Protobuf.in(request.getMessage.toByteArray, Some(messageClass))
case SerializationProtocol.JAVA =>
Serializer.Java.in(request.getMessage.toByteArray, None)
unbox(Serializer.Java.in(request.getMessage.toByteArray, None))
case SerializationProtocol.AVRO =>
throw new UnsupportedOperationException("Avro protocol is not yet supported")
}
}
def getMessage(reply: RemoteReply): AnyRef = {
def getMessage(reply: RemoteReply): Any = {
reply.getProtocol match {
case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
@ -47,15 +47,15 @@ object RemoteProtocolBuilder {
val messageClass = Serializer.Java.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
Serializer.Protobuf.in(reply.getMessage.toByteArray, Some(messageClass))
case SerializationProtocol.JAVA =>
Serializer.Java.in(reply.getMessage.toByteArray, None)
unbox(Serializer.Java.in(reply.getMessage.toByteArray, None))
case SerializationProtocol.AVRO =>
throw new UnsupportedOperationException("Avro protocol is not yet supported")
}
}
def setMessage(message: AnyRef, builder: RemoteRequest.Builder) = {
def setMessage(message: Any, builder: RemoteRequest.Builder) = {
if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: AnyRef]]
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setProtocol(SerializationProtocol.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
@ -68,22 +68,22 @@ object RemoteProtocolBuilder {
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
builder.setProtocol(SerializationProtocol.SCALA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.JavaJSON]) {
val serializable = message.asInstanceOf[Serializable.JavaJSON]
builder.setProtocol(SerializationProtocol.JAVA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else {
// default, e.g. if no protocol used explicitly then use Java serialization
builder.setProtocol(SerializationProtocol.JAVA)
builder.setMessage(ByteString.copyFrom(Serializer.Java.out(message)))
builder.setMessage(ByteString.copyFrom(Serializer.Java.out(box(message))))
}
}
def setMessage(message: AnyRef, builder: RemoteReply.Builder) = {
def setMessage(message: Any, builder: RemoteReply.Builder) = {
if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: AnyRef]]
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setProtocol(SerializationProtocol.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
@ -96,16 +96,41 @@ object RemoteProtocolBuilder {
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
builder.setProtocol(SerializationProtocol.SCALA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.JavaJSON]) {
val serializable = message.asInstanceOf[Serializable.JavaJSON]
builder.setProtocol(SerializationProtocol.JAVA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.asInstanceOf[AnyRef].getClass.getName.getBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else {
// default, e.g. if no protocol used explicitly then use Java serialization
builder.setProtocol(SerializationProtocol.JAVA)
builder.setMessage(ByteString.copyFrom(Serializer.Java.out(message)))
builder.setMessage(ByteString.copyFrom(Serializer.Java.out(box(message))))
}
}
private def box(value: Any): AnyRef = value match {
case value: Boolean => new java.lang.Boolean(value)
case value: Char => new java.lang.Character(value)
case value: Short => new java.lang.Short(value)
case value: Int => new java.lang.Integer(value)
case value: Long => new java.lang.Long(value)
case value: Float => new java.lang.Float(value)
case value: Double => new java.lang.Double(value)
case value: Byte => new java.lang.Byte(value)
case value => value.asInstanceOf[AnyRef]
}
private def unbox(value: AnyRef): Any = value match {
case value: java.lang.Boolean => value.booleanValue
case value: java.lang.Character => value.charValue
case value: java.lang.Short => value.shortValue
case value: java.lang.Integer => value.intValue
case value: java.lang.Long => value.longValue
case value: java.lang.Float => value.floatValue
case value: java.lang.Double => value.doubleValue
case value: java.lang.Byte => value.byteValue
case value => value
}
}

View file

@ -4,11 +4,12 @@
package se.scalablesolutions.akka.serialization
import com.google.protobuf.Message
import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream}
import reflect.{BeanProperty, Manifest}
import sbinary.DefaultProtocol
import com.google.protobuf.Message
import org.codehaus.jackson.map.ObjectMapper
import sjson.json.{Serializer =>SJSONSerializer}
/**

View file

@ -4,61 +4,59 @@
package se.scalablesolutions.akka.state
import scala.actors.Actor
import scala.actors.OutputChannel
import scala.actors.Future
import scala.actors.Actor._
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.actor.Actor
/**
* Implements Oz-style dataflow (single assignment) variables.
*
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object DataFlow {
def thread(body: => Unit) = {
object DataFlow {
case object Start
case object Exit
def thread(body: => Unit) = {
val thread = new IsolatedEventBasedThread(body).start
thread ! 'start
thread send Start
thread
}
def thread[MessageType, ReturnType](body: MessageType => ReturnType) =
new ReactiveEventBasedThread(body).start
private class IsolatedEventBasedThread(body: => Unit) extends Actor {
def act = loop {
react {
case 'start => body
case 'exit => exit()
}
def receive = {
case Start => body
case Exit => exit
}
}
private class ReactiveEventBasedThread[MessageType, ReturnType](body: MessageType => ReturnType) extends Actor {
def act = loop {
react {
case 'exit => exit()
case message => sender ! body(message.asInstanceOf[MessageType])
}
def thread[A <: AnyRef, R <: AnyRef](body: A => R) =
new ReactiveEventBasedThread(body).start
private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T)
extends Actor {
def receive = {
case Exit => exit
case message => reply(body(message.asInstanceOf[A]))
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class DataFlowVariable[T] {
private sealed abstract class DataFlowVariableMessage
private case class Set[T](value: T) extends DataFlowVariableMessage
private case object Get extends DataFlowVariableMessage
private val value = new AtomicReference[Option[T]](None)
private val blockedReaders = new ConcurrentLinkedQueue[Actor]
sealed class DataFlowVariable[T <: Any] {
val TIME_OUT = 10000
private class In[T](dataFlow: DataFlowVariable[T]) extends Actor {
def act = loop { react {
private sealed abstract class DataFlowVariableMessage
private case class Set[T <: Any](value: T) extends DataFlowVariableMessage
private case object Get extends DataFlowVariableMessage
private val value = new AtomicReference[Option[T]](None)
private val blockedReaders = new ConcurrentLinkedQueue[Actor]
private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
def receive = {
case Set(v) =>
if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) {
val iterator = dataFlow.blockedReaders.iterator
@ -66,73 +64,75 @@ object DataFlow {
dataFlow.blockedReaders.clear
} else throw new DataFlowVariableException(
"Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])")
case 'exit => exit()
}}
case Exit => exit
}
}
private class Out[T](dataFlow: DataFlowVariable[T]) extends Actor {
var reader: Option[OutputChannel[Any]] = None
def act = loop { react {
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
var reader: Option[Actor] = None
def receive = {
case Get =>
val ref = dataFlow.value.get
if (ref.isDefined) reply(ref.get) else reader = Some(sender)
if (ref.isDefined) reply(ref.get)
else reader = Some(sender.getOrElse(throw new IllegalStateException("No reader to DataFlowVariable is in scope")))
case Set(v) => if (reader.isDefined) reader.get ! v
case 'exit => exit()
}}
case Exit => exit
}
}
private[this] val in = { val in = new In(this); in.start; in }
def <<(ref: DataFlowVariable[T]) = in ! Set(ref())
def <<(ref: DataFlowVariable[T]) = in send Set(ref())
def <<(value: T) = in ! Set(value)
def apply(): T = {
def <<(value: T) = in send Set(value)
def apply(): T = {
val ref = value.get
if (ref.isDefined) ref.get
else {
val out = { val out = new Out(this); out.start; out }
blockedReaders.offer(out)
val future: Future[T] = out !! (Get, {case t: T => t})
val result = future()
out ! 'exit
result
val result = out !! (Get, TIME_OUT)
out send Exit
result.getOrElse(throw new DataFlowVariableException(
"Timed out (after " + TIME_OUT + " milliseconds) while waiting for result"))
}
}
def shutdown = in ! 'exit
def shutdown = in send Exit
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class DataFlowStream[T] extends Seq[T] {
class DataFlowStream[T <: Any] extends Seq[T] {
private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]]
def <<<(ref: DataFlowVariable[T]) = queue.offer(ref)
def <<<(value: T) = {
def <<<(value: T) = {
val ref = new DataFlowVariable[T]
ref << value
queue.offer(ref)
}
}
def apply(): T = {
val ref = queue.take
ref()
}
def take: DataFlowVariable[T] = queue.take
//==== For Seq ====
def length: Int = queue.size
def apply(i: Int): T = {
if (i == 0) apply()
else throw new UnsupportedOperationException("Access by index other than '0' is not supported by DataFlowSream")
}
else throw new UnsupportedOperationException(
"Access by index other than '0' is not supported by DataFlowStream")
}
override def elements: Iterator[T] = new Iterator[T] {
private val iter = queue.iterator
def hasNext: Boolean = iter.hasNext
@ -141,7 +141,7 @@ object DataFlow {
override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]]
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -158,8 +158,8 @@ object Test1 extends Application {
// =======================================
// This example is rom Oz wikipedia page: http://en.wikipedia.org/wiki/Oz_(programming_language)
/*
thread
/*
thread
Z = X+Y % will wait until both X and Y are bound to a value.
{Browse Z} % shows the value of Z.
end
@ -183,7 +183,7 @@ object Test2 extends Application {
/*
fun {Ints N Max}
if N == Max then nil
else
else
{Delay 1000}
N|{Ints N+1 Max}
end
@ -224,11 +224,11 @@ object Test2 extends Application {
object Test3 extends Application {
// Using DataFlowStream and foldLeft to calculate sum
/*
fun {Ints N Max}
if N == Max then nil
else
else
{Delay 1000}
N|{Ints N+1 Max}
end
@ -248,20 +248,20 @@ object Test3 extends Application {
import DataFlow._
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
println("Generating int: " + n)
stream <<< n
ints(n + 1, max, stream)
}
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
println("Calculating: " + s)
out <<< s
sum(in() + s, in, out)
}
def printSum(stream: DataFlowStream[Int]): Unit = {
println("Result: " + stream())
println("Result: " + stream())
printSum(stream)
}
@ -269,22 +269,22 @@ object Test3 extends Application {
val consumer = new DataFlowStream[Int]
thread { ints(0, 1000, producer) }
thread {
thread {
Thread.sleep(1000)
println("Sum: " + producer.map(x => x * x).foldLeft(0)(_ + _))
println("Sum: " + producer.map(x => x * x).foldLeft(0)(_ + _))
}
}
// =======================================
object Test4 extends Application {
object Test4 extends Application {
// Using DataFlowStream and recursive function to calculate sum
/*
fun {Ints N Max}
if N == Max then nil
else
else
{Delay 1000}
N|{Ints N+1 Max}
end
@ -304,20 +304,20 @@ object Test4 extends Application {
import DataFlow._
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
println("Generating int: " + n)
stream <<< n
ints(n + 1, max, stream)
}
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
println("Calculating: " + s)
out <<< s
sum(in() + s, in, out)
}
def printSum(stream: DataFlowStream[Int]): Unit = {
println("Result: " + stream())
println("Result: " + stream())
printSum(stream)
}
@ -332,6 +332,7 @@ object Test4 extends Application {
// =======================================
object Test5 extends Application {
import Actor.Sender.Self
import DataFlow._
// create four 'Int' data flow variables
@ -339,20 +340,20 @@ object Test5 extends Application {
val main = thread {
println("Thread 'main'")
x << 1
println("'x' set to: " + x())
println("Waiting for 'y' to be set...")
if (x() > y()) {
if (x() > y()) {
z << x
println("'z' set to 'x': " + z())
} else {
} else {
z << y
println("'z' set to 'y': " + z())
}
// main completed, shut down the data flow variables
x.shutdown
y.shutdown
@ -365,18 +366,20 @@ object Test5 extends Application {
Thread.sleep(5000)
y << 2
println("'y' set to: " + y())
}
}
val setV = thread {
println("Thread 'setV'")
v << y
println("'v' set to 'y': " + v())
println("'v' set to 'y': " + v())
}
// shut down the threads
main ! 'exit
setY ! 'exit
setV ! 'exit
// shut down the threads
main ! Exit
setY ! Exit
setV ! Exit
//System.gc
}

View file

@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger
import se.scalablesolutions.akka.state.Committable
import se.scalablesolutions.akka.util.Logging
import org.multiverse.api.{Stm, Transaction => MultiverseTransaction}
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.templates.OrElseTemplate

View file

@ -1,10 +1,7 @@
package se.scalablesolutions.akka.actor
import junit.framework.TestCase
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import scala.collection.mutable.HashSet
class MemoryFootprintTest extends JUnitSuite {
class Mem extends Actor {
@ -13,20 +10,24 @@ class MemoryFootprintTest extends JUnitSuite {
}
}
val NR_OF_ACTORS = 100000
val MAX_MEMORY_FOOTPRINT_PER_ACTOR = 600
@Test
def shouldCreateManyActors = {
/* println("============== MEMORY TEST ==============")
val actors = new HashSet[Actor]
println("Total memory: " + Runtime.getRuntime.totalMemory)
(1 until 1000000).foreach {i =>
val mem = new Mem
actors += mem
if ((i % 100000) == 0) {
println("Nr actors: " + i)
println("Total memory: " + (Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory))
}
}
*/
assert(true)
def actorsShouldHaveLessMemoryFootprintThan630Bytes = {
println("============== MEMORY FOOTPRINT TEST ==============")
// warm up
(1 until 10000).foreach(i => new Mem)
// Actors are put in AspectRegistry when created so they won't be GCd here
val totalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory
(1 until NR_OF_ACTORS).foreach(i => new Mem)
val newTotalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory
val memPerActor = (newTotalMem - totalMem) / NR_OF_ACTORS
println("Memory footprint per actor is : " + memPerActor)
assert(memPerActor < MAX_MEMORY_FOOTPRINT_PER_ACTOR) // memory per actor should be less than 630 bytes
}
}

View file

@ -8,7 +8,7 @@ public class PersistentClasher {
@inittransactionalstate
public void init() {
state = PersistentState.newMap(new CassandraStorageConfig());
state = CassandraStorage.newMap();
}
public String getState(String key) {

View file

@ -12,18 +12,19 @@ public class PersistentStateful {
@inittransactionalstate
public void init() {
mapState = PersistentState.newMap(new CassandraStorageConfig());
vectorState = PersistentState.newVector(new CassandraStorageConfig());
refState = PersistentState.newRef(new CassandraStorageConfig());
mapState = CassandraStorage.newMap();
vectorState = CassandraStorage.newVector();
refState = CassandraStorage.newRef();
}
public String getMapState(String key) {
return (String) mapState.get(key).get();
byte[] bytes = (byte[]) mapState.get(key.getBytes()).get();
return new String(bytes, 0, bytes.length);
}
public String getVectorState(int index) {
return (String) vectorState.get(index);
byte[] bytes = (byte[]) vectorState.get(index);
return new String(bytes, 0, bytes.length);
}
public int getVectorLength() {
@ -32,62 +33,51 @@ public class PersistentStateful {
public String getRefState() {
if (refState.isDefined()) {
return (String) refState.get().get();
byte[] bytes = (byte[]) refState.get().get();
return new String(bytes, 0, bytes.length);
} else throw new IllegalStateException("No such element");
}
public void setMapState(String key, String msg) {
mapState.put(key, msg);
mapState.put(key.getBytes(), msg.getBytes());
}
public void setVectorState(String msg) {
vectorState.add(msg);
vectorState.add(msg.getBytes());
}
public void setRefState(String msg) {
refState.swap(msg);
refState.swap(msg.getBytes());
}
public void success(String key, String msg) {
mapState.put(key, msg);
vectorState.add(msg);
refState.swap(msg);
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
}
public String failure(String key, String msg, PersistentFailer failer) {
mapState.put(key, msg);
vectorState.add(msg);
refState.swap(msg);
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
failer.fail();
return msg;
}
public String success(String key, String msg, PersistentStatefulNested nested) {
mapState.put(key, msg);
vectorState.add(msg);
refState.swap(msg);
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
nested.success(key, msg);
return msg;
}
public String failure(String key, String msg, PersistentStatefulNested nested, PersistentFailer failer) {
mapState.put(key, msg);
vectorState.add(msg);
refState.swap(msg);
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
nested.failure(key, msg, failer);
return msg;
}
public void thisMethodHangs(String key, String msg, PersistentFailer failer) {
setMapState(key, msg);
}
}

View file

@ -12,18 +12,20 @@ public class PersistentStatefulNested {
@inittransactionalstate
public void init() {
mapState = PersistentState.newMap(new CassandraStorageConfig());
vectorState = PersistentState.newVector(new CassandraStorageConfig());
refState = PersistentState.newRef(new CassandraStorageConfig());
mapState = CassandraStorage.newMap();
vectorState = CassandraStorage.newVector();
refState = CassandraStorage.newRef();
}
public String getMapState(String key) {
return (String) mapState.get(key).get();
byte[] bytes = (byte[]) mapState.get(key.getBytes()).get();
return new String(bytes, 0, bytes.length);
}
public String getVectorState(int index) {
return (String) vectorState.get(index);
byte[] bytes = (byte[]) vectorState.get(index);
return new String(bytes, 0, bytes.length);
}
public int getVectorLength() {
@ -32,45 +34,36 @@ public class PersistentStatefulNested {
public String getRefState() {
if (refState.isDefined()) {
return (String) refState.get().get();
byte[] bytes = (byte[]) refState.get().get();
return new String(bytes, 0, bytes.length);
} else throw new IllegalStateException("No such element");
}
public void setMapState(String key, String msg) {
mapState.put(key, msg);
mapState.put(key.getBytes(), msg.getBytes());
}
public void setVectorState(String msg) {
vectorState.add(msg);
vectorState.add(msg.getBytes());
}
public void setRefState(String msg) {
refState.swap(msg);
refState.swap(msg.getBytes());
}
public String success(String key, String msg) {
mapState.put(key, msg);
vectorState.add(msg);
refState.swap(msg);
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
return msg;
}
public String failure(String key, String msg, PersistentFailer failer) {
mapState.put(key, msg);
vectorState.add(msg);
refState.swap(msg);
mapState.put(key.getBytes(), msg.getBytes());
vectorState.add(msg.getBytes());
refState.swap(msg.getBytes());
failer.fail();
return msg;
}
public void thisMethodHangs(String key, String msg, PersistentFailer failer) {
setMapState(key, msg);
}
}

View file

@ -14,6 +14,7 @@ import java.net.URLClassLoader
import se.scalablesolutions.akka.nio.RemoteNode
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.actor.ActorRegistry
/**
* The Akka Kernel.
@ -32,10 +33,14 @@ object Kernel extends Logging {
// FIXME add API to shut server down gracefully
@volatile private var hasBooted = false
private var jerseySelectorThread: SelectorThread = _
private var jerseySelectorThread: Option[SelectorThread] = None
private val startTime = System.currentTimeMillis
private var applicationLoader: Option[ClassLoader] = None
private lazy val remoteServerThread = new Thread(new Runnable() {
def run = RemoteNode.start(applicationLoader)
}, "Akka Remote Service")
def main(args: Array[String]) = boot
/**
@ -61,20 +66,33 @@ object Kernel extends Logging {
hasBooted = true
}
}
def startRemoteService = {
// FIXME manage remote serve thread for graceful shutdown
val remoteServerThread = new Thread(new Runnable() {
def run = RemoteNode.start(applicationLoader)
}, "Akka Remote Service")
remoteServerThread.start
// TODO document Kernel.shutdown
def shutdown = synchronized {
if (hasBooted) {
log.info("Shutting down Akka...")
ActorRegistry.shutdownAll
if (jerseySelectorThread.isDefined) {
log.info("Shutting down REST service (Jersey)")
jerseySelectorThread.get.stopEndpoint
}
if (remoteServerThread.isAlive) {
log.info("Shutting down remote service")
RemoteNode.shutdown
remoteServerThread.join(1000)
}
log.info("Akka succesfully shut down")
}
}
def startRemoteService = remoteServerThread.start
def startREST = {
val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()
val scheme = uri.getScheme
if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException("The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'")
if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException(
"The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'")
val adapter = new ServletAdapter
adapter.setHandleStaticResources(true)
@ -83,19 +101,19 @@ object Kernel extends Logging {
//Using autodetection for now
//adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport")
if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root")
log.info("REST service root path: [" + adapter.getRootFolder + "] and context path [" + adapter.getContextPath + "] ")
log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath)
val ah = new com.sun.grizzly.arp.DefaultAsyncHandler
ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter)
jerseySelectorThread = new SelectorThread
jerseySelectorThread.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName)
jerseySelectorThread.setPort(REST_PORT)
jerseySelectorThread.setAdapter(adapter)
jerseySelectorThread.setEnableAsyncExecution(true)
jerseySelectorThread.setAsyncHandler(ah)
jerseySelectorThread.listen
jerseySelectorThread = Some(new SelectorThread)
jerseySelectorThread.get.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName)
jerseySelectorThread.get.setPort(REST_PORT)
jerseySelectorThread.get.setAdapter(adapter)
jerseySelectorThread.get.setEnableAsyncExecution(true)
jerseySelectorThread.get.setAsyncHandler(ah)
jerseySelectorThread.get.listen
log.info("REST service started successfully. Listening to port [" + REST_PORT + "]")
log.info("REST service started successfully. Listening to port [%s]", REST_PORT)
}
private def runApplicationBootClasses = {
@ -113,7 +131,8 @@ object Kernel extends Logging {
new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
} else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) {
getClass.getClassLoader
} else throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
} else throw new IllegalStateException(
"AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
for (clazz <- BOOT_CLASSES) {
log.info("Loading boot class [%s]", clazz)
loader.loadClass(clazz).newInstance
@ -132,7 +151,7 @@ object Kernel extends Logging {
(____ /__|_ \__|_ \(____ /
\/ \/ \/ \/
""")
log.info(" Running version " + VERSION)
log.info(" Running version %s", VERSION)
log.info("==============================")
}
}

View file

@ -30,7 +30,7 @@
<dependency>
<groupId>com.mongodb</groupId>
<artifactId>mongo</artifactId>
<version>0.6</version>
<version>1.0</version>
</dependency>
<!-- For Cassandra -->
@ -49,7 +49,12 @@
<artifactId>commons-pool</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.13</version>
</dependency>
<!-- For Testing -->
<dependency>
<groupId>org.scalatest</groupId>

View file

@ -6,7 +6,6 @@ package se.scalablesolutions.akka.state
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.util.Helpers._
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.Config.config
import org.apache.cassandra.service._
@ -14,8 +13,14 @@ import org.apache.cassandra.service._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object CassandraStorage extends MapStorage
with VectorStorage with RefStorage with Logging {
private[akka] object CassandraStorageBackend extends
MapStorageBackend[Array[Byte], Array[Byte]] with
VectorStorageBackend[Array[Byte]] with
RefStorageBackend[Array[Byte]] with
Logging {
type ElementType = Array[Byte]
val KEYSPACE = "akka"
val MAP_COLUMN_PARENT = new ColumnParent("map", null)
val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null)
@ -31,35 +36,14 @@ object CassandraStorage extends MapStorage
case "ONE" => 1
case "QUORUM" => 2
case "ALL" => 3
case unknown => throw new IllegalArgumentException("Consistency level [" + unknown + "] is not supported. Expected one of [ZERO, ONE, QUORUM, ALL]")
case unknown => throw new IllegalArgumentException(
"Cassandra consistency level [" + unknown + "] is not supported. Expected one of [ZERO, ONE, QUORUM, ALL]")
}
}
val IS_ASCENDING = true
@volatile private[this] var isRunning = false
private[this] val protocol: Protocol = Protocol.Binary
/* {
config.getString("akka.storage.cassandra.procotol", "binary") match {
case "binary" => Protocol.Binary
case "json" => Protocol.JSON
case "simple-json" => Protocol.SimpleJSON
case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
}
}
*/
private[this] val serializer: Serializer = {
config.getString("akka.storage.cassandra.storage-format", "manual") match {
case "scala-json" => Serializer.ScalaJSON
case "java-json" => Serializer.JavaJSON
case "protobuf" => Serializer.Protobuf
case "java" => Serializer.Java
case "manual" => Serializer.NOOP
case "sbinary" => throw new UnsupportedOperationException("SBinary serialization protocol is not yet supported for storage")
case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported for storage")
case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
}
}
private[this] val sessions = new CassandraSessionPool(
KEYSPACE,
@ -71,22 +55,22 @@ object CassandraStorage extends MapStorage
// For Ref
// ===============================================================
def insertRefStorageFor(name: String, element: AnyRef) = {
def insertRefStorageFor(name: String, element: Array[Byte]) = {
sessions.withSession {
_ ++| (name,
new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY),
serializer.out(element),
element,
System.currentTimeMillis,
CONSISTENCY_LEVEL)
}
}
def getRefStorageFor(name: String): Option[AnyRef] = {
def getRefStorageFor(name: String): Option[Array[Byte]] = {
try {
val column: Option[ColumnOrSuperColumn] = sessions.withSession {
_ | (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY))
}
if (column.isDefined) Some(serializer.in(column.get.getColumn.value, None))
if (column.isDefined) Some(column.get.getColumn.value)
else None
} catch {
case e =>
@ -99,40 +83,40 @@ object CassandraStorage extends MapStorage
// For Vector
// ===============================================================
def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
sessions.withSession {
_ ++| (name,
new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))),
serializer.out(element),
element,
System.currentTimeMillis,
CONSISTENCY_LEVEL)
}
}
// FIXME implement insertVectorStorageEntriesFor
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
throw new UnsupportedOperationException("insertVectorStorageEntriesFor for CassandraStorage is not implemented yet")
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = {
throw new UnsupportedOperationException("insertVectorStorageEntriesFor for CassandraStorageBackend is not implemented yet")
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = {
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
sessions.withSession {
_ ++| (name,
new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)),
serializer.out(elem),
elem,
System.currentTimeMillis,
CONSISTENCY_LEVEL)
}
}
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
val column: Option[ColumnOrSuperColumn] = sessions.withSession {
_ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)))
}
if (column.isDefined) serializer.in(column.get.column.value, None)
if (column.isDefined) column.get.column.value
else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
}
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
val startBytes = if (start.isDefined) intToBytes(start.get) else null
val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null
val columns: List[ColumnOrSuperColumn] = sessions.withSession {
@ -143,7 +127,7 @@ object CassandraStorage extends MapStorage
count,
CONSISTENCY_LEVEL)
}
columns.map(column => serializer.in(column.getColumn.value, None))
columns.map(column => column.getColumn.value)
}
def getVectorStorageSizeFor(name: String): Int = {
@ -156,21 +140,21 @@ object CassandraStorage extends MapStorage
// For Map
// ===============================================================
def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = {
def insertMapStorageEntryFor(name: String, key: Array[Byte], element: Array[Byte]) = {
sessions.withSession {
_ ++| (name,
new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)),
serializer.out(element),
new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, key),
element,
System.currentTimeMillis,
CONSISTENCY_LEVEL)
}
}
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = {
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) = {
val batch = new scala.collection.mutable.HashMap[String, List[ColumnOrSuperColumn]]
for (entry <- entries) {
val columnOrSuperColumn = new ColumnOrSuperColumn
columnOrSuperColumn.setColumn(new Column(serializer.out(entry._1), serializer.out(entry._2), System.currentTimeMillis))
columnOrSuperColumn.setColumn(new Column(entry._1, entry._2, System.currentTimeMillis))
batch + (MAP_COLUMN_PARENT.getColumn_family -> List(columnOrSuperColumn))
}
sessions.withSession {
@ -178,12 +162,12 @@ object CassandraStorage extends MapStorage
}
}
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
try {
val column: Option[ColumnOrSuperColumn] = sessions.withSession {
_ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)))
_ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, key))
}
if (column.isDefined) Some(serializer.in(column.get.getColumn.value, None))
if (column.isDefined) Some(column.get.getColumn.value)
else None
} catch {
case e =>
@ -192,13 +176,16 @@ object CassandraStorage extends MapStorage
}
}
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
def getMapStorageFor(name: String): List[Tuple2[Array[Byte], Array[Byte]]] = {
val size = getMapStorageSizeFor(name)
sessions.withSession { session =>
val columns = session / (name, MAP_COLUMN_PARENT, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY, true, size, CONSISTENCY_LEVEL)
val columns = session /
(name, MAP_COLUMN_PARENT,
EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY,
true, size, CONSISTENCY_LEVEL)
for {
columnOrSuperColumn <- columns
entry = (serializer.in(columnOrSuperColumn.column.name, None), serializer.in(columnOrSuperColumn.column.value, None))
entry = (columnOrSuperColumn.column.name, columnOrSuperColumn.column.value)
} yield entry
}
}
@ -209,8 +196,8 @@ object CassandraStorage extends MapStorage
def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
def removeMapStorageFor(name: String, key: AnyRef): Unit = {
val keyBytes = if (key == null) null else serializer.out(key)
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {
val keyBytes = if (key == null) null else key
sessions.withSession {
_ -- (name,
new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, keyBytes),
@ -219,13 +206,13 @@ object CassandraStorage extends MapStorage
}
}
def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int):
List[Tuple2[AnyRef, AnyRef]] = {
val startBytes = if (start.isDefined) serializer.out(start.get) else null
val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int):
List[Tuple2[Array[Byte], Array[Byte]]] = {
val startBytes = if (start.isDefined) start.get else null
val finishBytes = if (finish.isDefined) finish.get else null
val columns: List[ColumnOrSuperColumn] = sessions.withSession {
_ / (name, MAP_COLUMN_PARENT, startBytes, finishBytes, IS_ASCENDING, count, CONSISTENCY_LEVEL)
}
columns.map(column => (column.getColumn.name, serializer.in(column.getColumn.value, None)))
columns.map(column => (column.getColumn.name, column.getColumn.value))
}
}

View file

@ -4,8 +4,8 @@
package se.scalablesolutions.akka.state
import util.Logging
import Config.config
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.Config.config
import sjson.json.Serializer._
@ -23,8 +23,12 @@ import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
* <p/>
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
object MongoStorage extends MapStorage with VectorStorage with RefStorage with Logging {
private[akka] object MongoStorageBackend extends
MapStorageBackend[AnyRef, AnyRef] with
VectorStorageBackend[AnyRef] with
RefStorageBackend[AnyRef] with
Logging {
// enrich with null safe findOne
class RichDBCollection(value: DBCollection) {
def findOneNS(o: DBObject): Option[DBObject] = {
@ -34,43 +38,43 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
}
}
}
implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c)
val KEY = "key"
val VALUE = "value"
val COLLECTION = "akka_coll"
val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb")
val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017)
val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT, MONGODB_SERVER_DBNAME)
val coll = db.getCollection(COLLECTION)
val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT)
val coll = db.getDB(MONGODB_SERVER_DBNAME).getCollection(COLLECTION)
// FIXME: make this pluggable
private[this] val serializer = SJSON
def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) {
insertMapStorageEntriesFor(name, List((key, value)))
}
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) {
import java.util.{Map, HashMap}
val m: Map[AnyRef, AnyRef] = new HashMap
for ((k, v) <- entries) {
m.put(k, serializer.out(v))
}
nullSafeFindOne(name) match {
case None =>
case None =>
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
case Some(dbo) => {
// collate the maps
val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]]
o.putAll(m)
// remove existing reference
removeMapStorageFor(name)
// and insert
@ -78,16 +82,16 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
}
}
}
def removeMapStorageFor(name: String) = {
def removeMapStorageFor(name: String): Unit = {
val q = new BasicDBObject
q.put(KEY, name)
coll.remove(q)
}
def removeMapStorageFor(name: String, key: AnyRef) = {
def removeMapStorageFor(name: String, key: AnyRef): Unit = {
nullSafeFindOne(name) match {
case None =>
case None =>
case Some(dbo) => {
val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap
if (key.isInstanceOf[List[_]]) {
@ -104,10 +108,10 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
}
}
}
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] =
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] =
getValueForKey(name, key.asInstanceOf[String])
def getMapStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
@ -115,55 +119,55 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size
}
}
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
val m =
val m =
nullSafeFindOne(name) match {
case None =>
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
val n =
val n =
List(m.keySet.toArray: _*).asInstanceOf[List[String]]
val vals =
for(s <- n)
val vals =
for(s <- n)
yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
def getMapStorageRangeFor(name: String, start: Option[AnyRef],
finish: Option[AnyRef],
def getMapStorageRangeFor(name: String, start: Option[AnyRef],
finish: Option[AnyRef],
count: Int): List[Tuple2[AnyRef, AnyRef]] = {
val m =
val m =
nullSafeFindOne(name) match {
case None =>
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
/**
* <tt>count</tt> is the max number of results to return. Start with
* <tt>count</tt> is the max number of results to return. Start with
* <tt>start</tt> or 0 (if <tt>start</tt> is not defined) and go until
* you hit <tt>finish</tt> or <tt>count</tt>.
*/
val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0
val cnt =
val cnt =
if (finish.isDefined) {
val f = finish.get.asInstanceOf[Int]
if (f >= s) Math.min(count, (f - s)) else count
}
else count
val n =
val n =
List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt)
val vals =
for(s <- n)
val vals =
for(s <- n)
yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
private def getValueForKey(name: String, key: String): Option[AnyRef] = {
try {
nullSafeFindOne(name) match {
@ -179,16 +183,16 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
throw new Predef.NoSuchElementException(e.getMessage)
}
}
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
val q = new BasicDBObject
q.put(KEY, name)
val currentList =
coll.findOneNS(q) match {
case None =>
case None =>
new JArrayList[AnyRef]
case Some(dbo) =>
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
}
if (!currentList.isEmpty) {
@ -196,26 +200,26 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
// remove before adding
coll.remove(q)
}
// add to the current list
elements.map(serializer.out(_)).foreach(currentList.add(_))
coll.insert(
new BasicDBObject()
.append(KEY, name)
.append(VALUE, currentList)
)
}
def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
insertVectorStorageEntriesFor(name, List(element))
}
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
try {
val o =
nullSafeFindOne(name) match {
case None =>
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
@ -224,17 +228,17 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
serializer.in[AnyRef](
o.get(index).asInstanceOf[Array[Byte]])
} catch {
case e =>
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
}
}
def getVectorStorageRangeFor(name: String,
def getVectorStorageRangeFor(name: String,
start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
try {
val o =
nullSafeFindOne(name) match {
case None =>
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
@ -242,24 +246,24 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
}
// pick the subrange and make a Scala list
val l =
val l =
List(o.subList(start.get, start.get + count).toArray: _*)
for(e <- l)
for(e <- l)
yield serializer.in[AnyRef](e.asInstanceOf[Array[Byte]])
} catch {
case e =>
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
}
}
// FIXME implement updateVectorStorageEntryFor
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = throw new UnsupportedOperationException
def getVectorStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
case Some(dbo) =>
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size
}
}

View file

@ -1,352 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.state
import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction
import se.scalablesolutions.akka.collection._
import se.scalablesolutions.akka.util.Logging
import org.codehaus.aspectwerkz.proxy.Uuid
class NoTransactionInScopeException extends RuntimeException
sealed abstract class PersistentStateConfig
abstract class PersistentStorageConfig extends PersistentStateConfig
case class CassandraStorageConfig() extends PersistentStorageConfig
case class TerracottaStorageConfig() extends PersistentStorageConfig
case class TokyoCabinetStorageConfig() extends PersistentStorageConfig
case class MongoStorageConfig() extends PersistentStorageConfig
/**
* Example Scala usage.
* <p/>
* New map with generated id.
* <pre>
* val myMap = PersistentState.newMap(CassandraStorageConfig)
* </pre>
*
* New map with user-defined id.
* <pre>
* val myMap = PersistentState.newMap(CassandraStorageConfig, id)
* </pre>
*
* Get map by user-defined id.
* <pre>
* val myMap = PersistentState.getMap(CassandraStorageConfig, id)
* </pre>
*
* Example Java usage:
* <pre>
* TransactionalMap myMap = PersistentState.newMap(new CassandraStorageConfig());
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object PersistentState {
def newMap(config: PersistentStorageConfig): PersistentMap =
// FIXME: won't work across the remote machines, use [http://johannburkard.de/software/uuid/]
newMap(config, Uuid.newUuid.toString)
def newVector(config: PersistentStorageConfig): PersistentVector =
newVector(config, Uuid.newUuid.toString)
def newRef(config: PersistentStorageConfig): PersistentRef =
newRef(config, Uuid.newUuid.toString)
def getMap(config: PersistentStorageConfig, id: String): PersistentMap =
newMap(config, id)
def getVector(config: PersistentStorageConfig, id: String): PersistentVector =
newVector(config, id)
def getRef(config: PersistentStorageConfig, id: String): PersistentRef =
newRef(config, id)
def newMap(config: PersistentStorageConfig, id: String): PersistentMap = config match {
case CassandraStorageConfig() => new CassandraPersistentMap(id)
case MongoStorageConfig() => new MongoPersistentMap(id)
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
def newVector(config: PersistentStorageConfig, id: String): PersistentVector = config match {
case CassandraStorageConfig() => new CassandraPersistentVector(id)
case MongoStorageConfig() => new MongoPersistentVector(id)
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
def newRef(config: PersistentStorageConfig, id: String): PersistentRef = config match {
case CassandraStorageConfig() => new CassandraPersistentRef(id)
case MongoStorageConfig() => new MongoPersistentRef(id)
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
}
/**
* Implementation of <tt>PersistentMap</tt> for every concrete
* storage will have the same workflow. This abstracts the workflow.
*
* Subclasses just need to provide the actual concrete instance for the
* abstract val <tt>storage</tt>.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef]
with Transactional with Committable with Logging {
protected val newAndUpdatedEntries = TransactionalState.newMap[AnyRef, AnyRef]
protected val removedEntries = TransactionalState.newVector[AnyRef]
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
// to be concretized in subclasses
val storage: MapStorage
def commit = {
storage.removeMapStorageFor(uuid, removedEntries.toList)
storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList)
if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get)
storage.removeMapStorageFor(uuid)
newAndUpdatedEntries.clear
removedEntries.clear
}
def -=(key: AnyRef) = remove(key)
def +=(key: AnyRef, value: AnyRef) = put(key, value)
override def put(key: AnyRef, value: AnyRef): Option[AnyRef] = {
register
newAndUpdatedEntries.put(key, value)
}
override def update(key: AnyRef, value: AnyRef) = {
register
newAndUpdatedEntries.update(key, value)
}
def remove(key: AnyRef) = {
register
removedEntries.add(key)
}
def slice(start: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] =
slice(start, None, count)
def slice(start: Option[AnyRef], finish: Option[AnyRef], count: Int):
List[Tuple2[AnyRef, AnyRef]] = try {
storage.getMapStorageRangeFor(uuid, start, finish, count)
} catch { case e: Exception => Nil }
override def clear = {
register
shouldClearOnCommit.swap(true)
}
override def contains(key: AnyRef): Boolean = try {
newAndUpdatedEntries.contains(key) ||
storage.getMapStorageEntryFor(uuid, key).isDefined
} catch { case e: Exception => false }
override def size: Int = try {
storage.getMapStorageSizeFor(uuid)
} catch { case e: Exception => 0 }
override def get(key: AnyRef): Option[AnyRef] = {
if (newAndUpdatedEntries.contains(key)) {
newAndUpdatedEntries.get(key)
}
else try {
storage.getMapStorageEntryFor(uuid, key)
} catch { case e: Exception => None }
}
override def elements: Iterator[Tuple2[AnyRef, AnyRef]] = {
new Iterator[Tuple2[AnyRef, AnyRef]] {
private val originalList: List[Tuple2[AnyRef, AnyRef]] = try {
storage.getMapStorageFor(uuid)
} catch {
case e: Throwable => Nil
}
// FIXME how to deal with updated entries, these should be replaced in the originalList not just added
private var elements = newAndUpdatedEntries.toList ::: originalList.reverse
override def next: Tuple2[AnyRef, AnyRef]= synchronized {
val element = elements.head
elements = elements.tail
element
}
override def hasNext: Boolean = synchronized { !elements.isEmpty }
}
}
private def register = {
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
currentTransaction.get.get.register(uuid, this)
}
}
/**
* Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class CassandraPersistentMap(id: String) extends PersistentMap {
val uuid = id
val storage = CassandraStorage
}
/**
* Implements a persistent transactional map based on the MongoDB document storage.
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
class MongoPersistentMap(id: String) extends PersistentMap {
val uuid = id
val storage = MongoStorage
}
/**
* Implements a template for a concrete persistent transactional vector based storage.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional with Committable {
protected val newElems = TransactionalState.newVector[AnyRef]
protected val updatedElems = TransactionalState.newMap[Int, AnyRef]
protected val removedElems = TransactionalState.newVector[AnyRef]
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
val storage: VectorStorage
def commit = {
// FIXME: should use batch function once the bug is resolved
for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element)
for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2)
newElems.clear
updatedElems.clear
}
def +(elem: AnyRef) = add(elem)
def add(elem: AnyRef) = {
register
newElems + elem
}
def apply(index: Int): AnyRef = get(index)
def get(index: Int): AnyRef = {
if (newElems.size > index) newElems(index)
else storage.getVectorStorageEntryFor(uuid, index)
}
override def slice(start: Int, count: Int): RandomAccessSeq[AnyRef] = slice(Some(start), None, count)
def slice(start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] = {
val buffer = new scala.collection.mutable.ArrayBuffer[AnyRef]
storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_))
buffer
}
/**
* Removes the <i>tail</i> element of this vector.
*/
// FIXME: implement persistent vector pop
def pop: AnyRef = {
register
throw new UnsupportedOperationException("need to implement persistent vector pop")
}
def update(index: Int, newElem: AnyRef) = {
register
storage.updateVectorStorageEntryFor(uuid, index, newElem)
}
override def first: AnyRef = get(0)
override def last: AnyRef = {
if (newElems.length != 0) newElems.last
else {
val len = length
if (len == 0) throw new NoSuchElementException("Vector is empty")
get(len - 1)
}
}
def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length
private def register = {
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
currentTransaction.get.get.register(uuid, this)
}
}
/**
* Implements a persistent transactional vector based on the Cassandra
* distributed P2P key-value storage.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class CassandraPersistentVector(id: String) extends PersistentVector {
val uuid = id
val storage = CassandraStorage
}
/**
* Implements a persistent transactional vector based on the MongoDB
* document storage.
*
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
*/
class MongoPersistentVector(id: String) extends PersistentVector {
val uuid = id
val storage = MongoStorage
}
/**
* Implements a persistent reference with abstract storage.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentRef extends Transactional with Committable {
protected val ref = new TransactionalRef[AnyRef]
val storage: RefStorage
def commit = if (ref.isDefined) {
storage.insertRefStorageFor(uuid, ref.get.get)
ref.swap(null)
}
def swap(elem: AnyRef) = {
register
ref.swap(elem)
}
def get: Option[AnyRef] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid)
def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined
def getOrElse(default: => AnyRef): AnyRef = {
val current = get
if (current.isDefined) current.get
else default
}
private def register = {
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
currentTransaction.get.get.register(uuid, this)
}
}
class CassandraPersistentRef(id: String) extends PersistentRef {
val uuid = id
val storage = CassandraStorage
}
class MongoPersistentRef(id: String) extends PersistentRef {
val uuid = id
val storage = MongoStorage
}

View file

@ -4,33 +4,352 @@
package se.scalablesolutions.akka.state
// abstracts persistence storage
trait Storage
import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction
import se.scalablesolutions.akka.collection._
import se.scalablesolutions.akka.util.Logging
// for Maps
trait MapStorage extends Storage {
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]])
def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef)
def removeMapStorageFor(name: String)
def removeMapStorageFor(name: String, key: AnyRef)
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef]
def getMapStorageSizeFor(name: String): Int
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]]
def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]]
import org.codehaus.aspectwerkz.proxy.Uuid
class NoTransactionInScopeException extends RuntimeException
/**
* Example Scala usage.
* <p/>
* New map with generated id.
* <pre>
* val myMap = CassandraStorage.newMap
* </pre>
*
* New map with user-defined id.
* <pre>
* val myMap = MongoStorage.newMap(id)
* </pre>
*
* Get map by user-defined id.
* <pre>
* val myMap = CassandraStorage.getMap(id)
* </pre>
*
* Example Java usage:
* <pre>
* PersistentMap<Object, Object> myMap = MongoStorage.newMap();
* </pre>
* Or:
* <pre>
* MongoPersistentMap myMap = MongoStorage.getMap(id);
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Storage {
// FIXME: The UUID won't work across the remote machines, use [http://johannburkard.de/software/uuid/]
type ElementType
def newMap: PersistentMap[ElementType, ElementType]
def newVector: PersistentVector[ElementType]
def newRef: PersistentRef[ElementType]
def getMap(id: String): PersistentMap[ElementType, ElementType]
def getVector(id: String): PersistentVector[ElementType]
def getRef(id: String): PersistentRef[ElementType]
def newMap(id: String): PersistentMap[ElementType, ElementType]
def newVector(id: String): PersistentVector[ElementType]
def newRef(id: String): PersistentRef[ElementType]
}
// for Vectors
trait VectorStorage extends Storage {
def insertVectorStorageEntryFor(name: String, element: AnyRef)
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef])
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef)
def getVectorStorageEntryFor(name: String, index: Int): AnyRef
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef]
def getVectorStorageSizeFor(name: String): Int
object CassandraStorage extends Storage {
type ElementType = Array[Byte]
def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString)
def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString)
def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString)
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
def newMap(id: String): PersistentMap[ElementType, ElementType] = new CassandraPersistentMap(id)
def newVector(id: String): PersistentVector[ElementType] = new CassandraPersistentVector(id)
def newRef(id: String): PersistentRef[ElementType] = new CassandraPersistentRef(id)
}
// for Ref
trait RefStorage extends Storage {
def insertRefStorageFor(name: String, element: AnyRef)
def getRefStorageFor(name: String): Option[AnyRef]
object MongoStorage extends Storage {
type ElementType = AnyRef
def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString)
def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString)
def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString)
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
def newMap(id: String): PersistentMap[ElementType, ElementType] = new MongoPersistentMap(id)
def newVector(id: String): PersistentVector[ElementType] = new MongoPersistentVector(id)
def newRef(id: String): PersistentRef[ElementType] = new MongoPersistentRef(id)
}
/**
* Implementation of <tt>PersistentMap</tt> for every concrete
* storage will have the same workflow. This abstracts the workflow.
*
* Subclasses just need to provide the actual concrete instance for the
* abstract val <tt>storage</tt>.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
with Transactional with Committable with Logging {
protected val newAndUpdatedEntries = TransactionalState.newMap[K, V]
protected val removedEntries = TransactionalState.newVector[K]
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
// to be concretized in subclasses
val storage: MapStorageBackend[K, V]
def commit = {
removedEntries.toList.foreach(key => storage.removeMapStorageFor(uuid, key))
storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList)
if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get)
storage.removeMapStorageFor(uuid)
newAndUpdatedEntries.clear
removedEntries.clear
}
def -=(key: K) = remove(key)
def +=(key: K, value: V) = put(key, value)
override def put(key: K, value: V): Option[V] = {
register
newAndUpdatedEntries.put(key, value)
}
override def update(key: K, value: V) = {
register
newAndUpdatedEntries.update(key, value)
}
def remove(key: K) = {
register
removedEntries.add(key)
}
def slice(start: Option[K], count: Int): List[Tuple2[K, V]] =
slice(start, None, count)
def slice(start: Option[K], finish: Option[K], count: Int): List[Tuple2[K, V]] = try {
storage.getMapStorageRangeFor(uuid, start, finish, count)
} catch { case e: Exception => Nil }
override def clear = {
register
shouldClearOnCommit.swap(true)
}
override def contains(key: K): Boolean = try {
newAndUpdatedEntries.contains(key) ||
storage.getMapStorageEntryFor(uuid, key).isDefined
} catch { case e: Exception => false }
override def size: Int = try {
storage.getMapStorageSizeFor(uuid)
} catch { case e: Exception => 0 }
override def get(key: K): Option[V] = {
if (newAndUpdatedEntries.contains(key)) {
newAndUpdatedEntries.get(key)
}
else try {
storage.getMapStorageEntryFor(uuid, key)
} catch { case e: Exception => None }
}
override def elements: Iterator[Tuple2[K, V]] = {
new Iterator[Tuple2[K, V]] {
private val originalList: List[Tuple2[K, V]] = try {
storage.getMapStorageFor(uuid)
} catch {
case e: Throwable => Nil
}
// FIXME how to deal with updated entries, these should be replaced in the originalList not just added
private var elements = newAndUpdatedEntries.toList ::: originalList.reverse
override def next: Tuple2[K, V]= synchronized {
val element = elements.head
elements = elements.tail
element
}
override def hasNext: Boolean = synchronized { !elements.isEmpty }
}
}
private def register = {
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
currentTransaction.get.get.register(uuid, this)
}
}
/**
* Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class CassandraPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] {
val uuid = id
val storage = CassandraStorageBackend
}
/**
* Implements a persistent transactional map based on the MongoDB document storage.
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
class MongoPersistentMap(id: String) extends PersistentMap[AnyRef, AnyRef] {
val uuid = id
val storage = MongoStorageBackend
}
/**
* Implements a template for a concrete persistent transactional vector based storage.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentVector[T] extends RandomAccessSeq[T] with Transactional with Committable {
protected val newElems = TransactionalState.newVector[T]
protected val updatedElems = TransactionalState.newMap[Int, T]
protected val removedElems = TransactionalState.newVector[T]
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
val storage: VectorStorageBackend[T]
def commit = {
// FIXME: should use batch function once the bug is resolved
for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element)
for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2)
newElems.clear
updatedElems.clear
}
def +(elem: T) = add(elem)
def add(elem: T) = {
register
newElems + elem
}
def apply(index: Int): T = get(index)
def get(index: Int): T = {
if (newElems.size > index) newElems(index)
else storage.getVectorStorageEntryFor(uuid, index)
}
override def slice(start: Int, count: Int): RandomAccessSeq[T] = slice(Some(start), None, count)
def slice(start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[T] = {
val buffer = new scala.collection.mutable.ArrayBuffer[T]
storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_))
buffer
}
/**
* Removes the <i>tail</i> element of this vector.
*/
// FIXME: implement persistent vector pop
def pop: T = {
register
throw new UnsupportedOperationException("need to implement persistent vector pop")
}
def update(index: Int, newElem: T) = {
register
storage.updateVectorStorageEntryFor(uuid, index, newElem)
}
override def first: T = get(0)
override def last: T = {
if (newElems.length != 0) newElems.last
else {
val len = length
if (len == 0) throw new NoSuchElementException("Vector is empty")
get(len - 1)
}
}
def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length
private def register = {
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
currentTransaction.get.get.register(uuid, this)
}
}
/**
* Implements a persistent transactional vector based on the Cassandra
* distributed P2P key-value storage.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class CassandraPersistentVector(id: String) extends PersistentVector[Array[Byte]] {
val uuid = id
val storage = CassandraStorageBackend
}
/**
* Implements a persistent transactional vector based on the MongoDB
* document storage.
*
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
*/
class MongoPersistentVector(id: String) extends PersistentVector[AnyRef] {
val uuid = id
val storage = MongoStorageBackend
}
/**
* Implements a persistent reference with abstract storage.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentRef[T] extends Transactional with Committable {
protected val ref = new TransactionalRef[T]
val storage: RefStorageBackend[T]
def commit = if (ref.isDefined) {
storage.insertRefStorageFor(uuid, ref.get.get)
ref.swap(null.asInstanceOf[T])
}
def swap(elem: T) = {
register
ref.swap(elem)
}
def get: Option[T] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid)
def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined
def getOrElse(default: => T): T = {
val current = get
if (current.isDefined) current.get
else default
}
private def register = {
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
currentTransaction.get.get.register(uuid, this)
}
}
class CassandraPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
val uuid = id
val storage = CassandraStorageBackend
}
class MongoPersistentRef(id: String) extends PersistentRef[AnyRef] {
val uuid = id
val storage = MongoStorageBackend
}

View file

@ -0,0 +1,36 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.state
// abstracts persistence storage
trait StorageBackend
// for Maps
trait MapStorageBackend[K, V] extends StorageBackend {
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[K, V]])
def insertMapStorageEntryFor(name: String, key: K, value: V)
def removeMapStorageFor(name: String)
def removeMapStorageFor(name: String, key: K)
def getMapStorageEntryFor(name: String, key: K): Option[V]
def getMapStorageSizeFor(name: String): Int
def getMapStorageFor(name: String): List[Tuple2[K, V]]
def getMapStorageRangeFor(name: String, start: Option[K], finish: Option[K], count: Int): List[Tuple2[K, V]]
}
// for Vectors
trait VectorStorageBackend[T] extends StorageBackend {
def insertVectorStorageEntryFor(name: String, element: T)
def insertVectorStorageEntriesFor(name: String, elements: List[T])
def updateVectorStorageEntryFor(name: String, index: Int, elem: T)
def getVectorStorageEntryFor(name: String, index: Int): T
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[T]
def getVectorStorageSizeFor(name: String): Int
}
// for Ref
trait RefStorageBackend[T] extends StorageBackend {
def insertRefStorageFor(name: String, element: T)
def getRefStorageFor(name: String): Option[T]
}

View file

@ -1,13 +1,10 @@
package se.scalablesolutions.akka.state
import akka.actor.Actor
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.Actor
import junit.framework.TestCase
import dispatch._
import org.junit.{Test, Before}
import org.junit.Test
import org.junit.Assert._
case class GetMapState(key: String)
@ -31,35 +28,35 @@ class CassandraPersistentActor extends Actor {
timeout = 100000
makeTransactionRequired
private lazy val mapState: PersistentMap = PersistentState.newMap(CassandraStorageConfig())
private lazy val vectorState: PersistentVector = PersistentState.newVector(CassandraStorageConfig())
private lazy val refState: PersistentRef = PersistentState.newRef(CassandraStorageConfig())
private lazy val mapState = CassandraStorage.newMap
private lazy val vectorState = CassandraStorage.newVector
private lazy val refState = CassandraStorage.newRef
def receive = {
case GetMapState(key) =>
reply(mapState.get(key).get)
reply(mapState.get(key.getBytes("UTF-8")).get)
case GetVectorSize =>
reply(vectorState.length.asInstanceOf[AnyRef])
case GetRefState =>
reply(refState.get.get)
case SetMapState(key, msg) =>
mapState.put(key, msg)
mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8"))
reply(msg)
case SetVectorState(msg) =>
vectorState.add(msg)
vectorState.add(msg.getBytes("UTF-8"))
reply(msg)
case SetRefState(msg) =>
refState.swap(msg)
refState.swap(msg.getBytes("UTF-8"))
reply(msg)
case Success(key, msg) =>
mapState.put(key, msg)
vectorState.add(msg)
refState.swap(msg)
mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8"))
vectorState.add(msg.getBytes("UTF-8"))
refState.swap(msg.getBytes("UTF-8"))
reply(msg)
case Failure(key, msg, failer) =>
mapState.put(key, msg)
vectorState.add(msg)
refState.swap(msg)
mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8"))
vectorState.add(msg.getBytes("UTF-8"))
refState.swap(msg.getBytes("UTF-8"))
failer !! "Failure"
reply(msg)
}
@ -74,14 +71,15 @@ class CassandraPersistentActor extends Actor {
}
class CassandraPersistentActorSpec extends TestCase {
@Test
def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new CassandraPersistentActor
stateful.start
stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
val result: Array[Byte] = (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get
assertEquals("new state", new String(result, 0, result.length, "UTF-8"))
}
@Test
@ -95,7 +93,8 @@ class CassandraPersistentActorSpec extends TestCase {
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
val result: Array[Byte] = (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get
assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state
}
@Test
@ -127,7 +126,8 @@ class CassandraPersistentActorSpec extends TestCase {
stateful.start
stateful !! SetRefState("init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assertEquals("new state", (stateful !! GetRefState).get)
val result: Array[Byte] = (stateful !! GetRefState).get
assertEquals("new state", new String(result, 0, result.length, "UTF-8"))
}
@Test
@ -141,6 +141,7 @@ class CassandraPersistentActorSpec extends TestCase {
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
val result: Array[Byte] = (stateful !! GetRefState).get
assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state
}
}

View file

@ -31,10 +31,8 @@ case object LogSize
class BankAccountActor extends Actor {
makeTransactionRequired
private val accountState =
PersistentState.newMap(MongoStorageConfig())
private val txnLog =
PersistentState.newVector(MongoStorageConfig())
private val accountState = MongoStorage.newMap
private val txnLog = MongoStorage.newVector
def receive: PartialFunction[Any, Unit] = {
// check balance

View file

@ -14,7 +14,7 @@ class MongoStorageSpec extends TestCase {
val changeSetM = new scala.collection.mutable.HashMap[AnyRef, AnyRef]
override def setUp = {
MongoStorage.coll.drop
MongoStorageBackend.coll.drop
}
@Test
@ -22,40 +22,40 @@ class MongoStorageSpec extends TestCase {
changeSetV += "debasish" // string
changeSetV += List(1, 2, 3) // Scala List
changeSetV += List(100, 200)
MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
assertEquals(
3,
MongoStorage.getVectorStorageSizeFor("U-A1"))
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
changeSetV.clear
// changeSetV should be reinitialized
changeSetV += List(12, 23, 45)
changeSetV += "maulindu"
MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
assertEquals(
5,
MongoStorage.getVectorStorageSizeFor("U-A1"))
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
// add more to the same changeSetV
changeSetV += "ramanendu"
changeSetV += Map(1 -> "dg", 2 -> "mc")
// add for a diff transaction
MongoStorage.insertVectorStorageEntriesFor("U-A2", changeSetV.toList)
MongoStorageBackend.insertVectorStorageEntriesFor("U-A2", changeSetV.toList)
assertEquals(
4,
MongoStorage.getVectorStorageSizeFor("U-A2"))
MongoStorageBackend.getVectorStorageSizeFor("U-A2"))
// previous transaction change set should remain same
assertEquals(
5,
MongoStorage.getVectorStorageSizeFor("U-A1"))
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
// test single element entry
MongoStorage.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9))
MongoStorageBackend.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9))
assertEquals(
6,
MongoStorage.getVectorStorageSizeFor("U-A1"))
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
}
@Test
@ -64,25 +64,25 @@ class MongoStorageSpec extends TestCase {
// initially everything 0
assertEquals(
0,
MongoStorage.getVectorStorageSizeFor("U-A2"))
MongoStorageBackend.getVectorStorageSizeFor("U-A2"))
assertEquals(
0,
MongoStorage.getVectorStorageSizeFor("U-A1"))
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
// get some stuff
changeSetV += "debasish"
changeSetV += List(BigDecimal(12), BigDecimal(13), BigDecimal(14))
MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
assertEquals(
2,
MongoStorage.getVectorStorageSizeFor("U-A1"))
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
val JsString(str) = MongoStorage.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString]
val JsString(str) = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString]
assertEquals("debasish", str)
val l = MongoStorage.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsValue]
val l = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsValue]
val num_list = list ! num
val num_list(l0) = l
assertEquals(List(12, 13, 14), l0)
@ -91,14 +91,14 @@ class MongoStorageSpec extends TestCase {
changeSetV += Map(1->1, 2->4, 3->9)
changeSetV += BigInt(2310)
changeSetV += List(100, 200, 300)
MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
assertEquals(
5,
MongoStorage.getVectorStorageSizeFor("U-A1"))
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
val r =
MongoStorage.getVectorStorageRangeFor("U-A1", Some(1), None, 3)
MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(1), None, 3)
assertEquals(3, r.size)
val lr = r(0).asInstanceOf[JsValue]
@ -109,12 +109,12 @@ class MongoStorageSpec extends TestCase {
@Test
def testVectorFetchForNonExistentKeys = {
try {
MongoStorage.getVectorStorageEntryFor("U-A1", 1)
MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1)
fail("should throw an exception")
} catch {case e: Predef.NoSuchElementException => {}}
try {
MongoStorage.getVectorStorageRangeFor("U-A1", Some(2), None, 12)
MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(2), None, 12)
fail("should throw an exception")
} catch {case e: Predef.NoSuchElementException => {}}
}
@ -128,43 +128,43 @@ class MongoStorageSpec extends TestCase {
changeSetM += "6" -> java.util.Calendar.getInstance.getTime
// insert all into Mongo
MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
assertEquals(
6,
MongoStorage.getMapStorageSizeFor("U-M1"))
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
// individual insert api
MongoStorage.insertMapStorageEntryFor("U-M1", "7", "akka")
MongoStorage.insertMapStorageEntryFor("U-M1", "8", List(23, 25))
MongoStorageBackend.insertMapStorageEntryFor("U-M1", "7", "akka")
MongoStorageBackend.insertMapStorageEntryFor("U-M1", "8", List(23, 25))
assertEquals(
8,
MongoStorage.getMapStorageSizeFor("U-M1"))
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
// add the same changeSet for another transaction
MongoStorage.insertMapStorageEntriesFor("U-M2", changeSetM.toList)
MongoStorageBackend.insertMapStorageEntriesFor("U-M2", changeSetM.toList)
assertEquals(
6,
MongoStorage.getMapStorageSizeFor("U-M2"))
MongoStorageBackend.getMapStorageSizeFor("U-M2"))
// the first transaction should remain the same
assertEquals(
8,
MongoStorage.getMapStorageSizeFor("U-M1"))
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
changeSetM.clear
}
@Test
def testMapContents = {
fillMap
MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
MongoStorage.getMapStorageEntryFor("U-M1", "2") match {
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
MongoStorageBackend.getMapStorageEntryFor("U-M1", "2") match {
case Some(x) => {
val JsString(str) = x.asInstanceOf[JsValue]
assertEquals("peter", str)
}
case None => fail("should fetch peter")
}
MongoStorage.getMapStorageEntryFor("U-M1", "4") match {
MongoStorageBackend.getMapStorageEntryFor("U-M1", "4") match {
case Some(x) => {
val num_list = list ! num
val num_list(l0) = x.asInstanceOf[JsValue]
@ -172,7 +172,7 @@ class MongoStorageSpec extends TestCase {
}
case None => fail("should fetch list")
}
MongoStorage.getMapStorageEntryFor("U-M1", "3") match {
MongoStorageBackend.getMapStorageEntryFor("U-M1", "3") match {
case Some(x) => {
val num_list = list ! num
val num_list(l0) = x.asInstanceOf[JsValue]
@ -183,7 +183,7 @@ class MongoStorageSpec extends TestCase {
// get the entire map
val l: List[Tuple2[AnyRef, AnyRef]] =
MongoStorage.getMapStorageFor("U-M1")
MongoStorageBackend.getMapStorageFor("U-M1")
assertEquals(4, l.size)
assertTrue(l.map(_._1).contains("1"))
@ -196,7 +196,7 @@ class MongoStorageSpec extends TestCase {
// trying to fetch for a non-existent transaction will throw
try {
MongoStorage.getMapStorageFor("U-M2")
MongoStorageBackend.getMapStorageFor("U-M2")
fail("should throw an exception")
} catch {case e: Predef.NoSuchElementException => {}}
@ -207,11 +207,11 @@ class MongoStorageSpec extends TestCase {
def testMapContentsByRange = {
fillMap
changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
// specify start and count
val l: List[Tuple2[AnyRef, AnyRef]] =
MongoStorage.getMapStorageRangeFor(
MongoStorageBackend.getMapStorageRangeFor(
"U-M1", Some(Integer.valueOf(2)), None, 3)
assertEquals(3, l.size)
@ -227,27 +227,27 @@ class MongoStorageSpec extends TestCase {
// specify start, finish and count where finish - start == count
assertEquals(3,
MongoStorage.getMapStorageRangeFor(
MongoStorageBackend.getMapStorageRangeFor(
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(5)), 3).size)
// specify start, finish and count where finish - start > count
assertEquals(3,
MongoStorage.getMapStorageRangeFor(
MongoStorageBackend.getMapStorageRangeFor(
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(9)), 3).size)
// do not specify start or finish
assertEquals(3,
MongoStorage.getMapStorageRangeFor(
MongoStorageBackend.getMapStorageRangeFor(
"U-M1", None, None, 3).size)
// specify finish and count
assertEquals(3,
MongoStorage.getMapStorageRangeFor(
MongoStorageBackend.getMapStorageRangeFor(
"U-M1", None, Some(Integer.valueOf(3)), 3).size)
// specify start, finish and count where finish < start
assertEquals(3,
MongoStorage.getMapStorageRangeFor(
MongoStorageBackend.getMapStorageRangeFor(
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(1)), 3).size)
changeSetM.clear
@ -258,35 +258,35 @@ class MongoStorageSpec extends TestCase {
fillMap
changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
assertEquals(5,
MongoStorage.getMapStorageSizeFor("U-M1"))
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
// remove key "3"
MongoStorage.removeMapStorageFor("U-M1", "3")
MongoStorageBackend.removeMapStorageFor("U-M1", "3")
assertEquals(4,
MongoStorage.getMapStorageSizeFor("U-M1"))
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
try {
MongoStorage.getMapStorageEntryFor("U-M1", "3")
MongoStorageBackend.getMapStorageEntryFor("U-M1", "3")
fail("should throw exception")
} catch { case e => {}}
// remove key "4"
MongoStorage.removeMapStorageFor("U-M1", "4")
MongoStorageBackend.removeMapStorageFor("U-M1", "4")
assertEquals(3,
MongoStorage.getMapStorageSizeFor("U-M1"))
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
// remove key "2"
MongoStorage.removeMapStorageFor("U-M1", "2")
MongoStorageBackend.removeMapStorageFor("U-M1", "2")
assertEquals(2,
MongoStorage.getMapStorageSizeFor("U-M1"))
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
// remove the whole stuff
MongoStorage.removeMapStorageFor("U-M1")
MongoStorageBackend.removeMapStorageFor("U-M1")
try {
MongoStorage.getMapStorageFor("U-M1")
MongoStorageBackend.getMapStorageFor("U-M1")
fail("should throw exception")
} catch { case e: NoSuchElementException => {}}
@ -303,14 +303,14 @@ class MongoStorageSpec extends TestCase {
@Test
def testRefStorage = {
MongoStorage.getRefStorageFor("U-R1") match {
MongoStorageBackend.getRefStorageFor("U-R1") match {
case None =>
case Some(o) => fail("should be None")
}
val m = Map("1"->1, "2"->4, "3"->9)
MongoStorage.insertRefStorageFor("U-R1", m)
MongoStorage.getRefStorageFor("U-R1") match {
MongoStorageBackend.insertRefStorageFor("U-R1", m)
MongoStorageBackend.getRefStorageFor("U-R1") match {
case None => fail("should not be empty")
case Some(r) => {
val a = r.asInstanceOf[JsValue]
@ -331,8 +331,8 @@ class MongoStorageSpec extends TestCase {
// insert another one
// the previous one should be replaced
val b = List("100", "jonas")
MongoStorage.insertRefStorageFor("U-R1", b)
MongoStorage.getRefStorageFor("U-R1") match {
MongoStorageBackend.insertRefStorageFor("U-R1", b)
MongoStorageBackend.getRefStorageFor("U-R1") match {
case None => fail("should not be empty")
case Some(r) => {
val a = r.asInstanceOf[JsValue]

View file

@ -12,9 +12,9 @@ import se.scalablesolutions.akka.annotation.transactionrequired;
import se.scalablesolutions.akka.annotation.prerestart;
import se.scalablesolutions.akka.annotation.postrestart;
import se.scalablesolutions.akka.state.PersistentMap;
import se.scalablesolutions.akka.state.PersistentState;
import se.scalablesolutions.akka.state.PersistentMap;
import se.scalablesolutions.akka.state.CassandraStorageConfig;
import se.scalablesolutions.akka.state.CassandraStorage;
import java.nio.ByteBuffer;
/**
* Try service out by invoking (multiple times):
@ -26,21 +26,22 @@ import se.scalablesolutions.akka.state.CassandraStorageConfig;
@Path("/persistentjavacount")
@transactionrequired
public class PersistentSimpleService {
private Object KEY = "COUNTER";
private String KEY = "COUNTER";
private boolean hasStartedTicking = false;
private PersistentMap storage = PersistentState.newMap(new CassandraStorageConfig());
private PersistentMap<byte[], byte[]> storage = CassandraStorage.newMap();
@GET
@Produces({"application/html"})
public String count() {
if (!hasStartedTicking) {
storage.put(KEY, 0);
storage.put(KEY.getBytes(), ByteBuffer.allocate(2).putInt(0).array());
hasStartedTicking = true;
return "Tick: 0\n";
} else {
int counter = (Integer)storage.get(KEY).get() + 1;
storage.put(KEY, counter);
byte[] bytes = (byte[])storage.get(KEY.getBytes()).get();
int counter = ByteBuffer.wrap(bytes).getInt();
storage.put(KEY.getBytes(), ByteBuffer.allocate(4).putInt(counter + 1).array());
return "Tick: " + counter + "\n";
}
}

View file

@ -13,7 +13,6 @@ import se.scalablesolutions.akka.annotation.prerestart;
import se.scalablesolutions.akka.annotation.postrestart;
import se.scalablesolutions.akka.state.TransactionalState;
import se.scalablesolutions.akka.state.TransactionalMap;
import se.scalablesolutions.akka.state.CassandraStorageConfig;
/**
* Try service out by invoking (multiple times):

View file

@ -1,13 +1,13 @@
package sample.lift
import se.scalablesolutions.akka.state.{PersistentState, TransactionalState, CassandraStorageConfig}
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState}
import java.lang.Integer
import javax.ws.rs.core.MultivaluedMap
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes}
import javax.ws.rs.{GET, Path, Produces}
import java.nio.ByteBuffer
/**
* Try service out by invoking (multiple times):
@ -56,7 +56,7 @@ class PersistentSimpleService extends Actor {
case object Tick
private val KEY = "COUNTER"
private var hasStartedTicking = false
private val storage = PersistentState.newMap(CassandraStorageConfig())
private val storage = CassandraStorage.newMap
@GET
@Produces(Array("text/html"))
@ -64,13 +64,14 @@ class PersistentSimpleService extends Actor {
def receive = {
case Tick => if (hasStartedTicking) {
val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue
storage.put(KEY, new Integer(counter + 1))
reply(<h1>Tick: {counter + 1}</h1>)
val bytes = storage.get(KEY.getBytes).get
val counter = ByteBuffer.wrap(bytes).getInt
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
reply(<success>Tick:{counter + 1}</success>)
} else {
storage.put(KEY, new Integer(0))
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(0).array)
hasStartedTicking = true
reply(<h1>Tick: 0</h1>)
reply(<success>Tick: 0</success>)
}
}
}

View file

@ -4,18 +4,19 @@
package sample.scala
import se.scalablesolutions.akka.state.{PersistentState, TransactionalState, CassandraStorageConfig}
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import java.lang.Integer
import java.nio.ByteBuffer
import javax.ws.rs.core.MultivaluedMap
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam}
import org.atmosphere.annotation.{Broadcast, Suspend}
import org.atmosphere.util.XSSHtmlFilter
import org.atmosphere.cpr.{BroadcastFilter,Broadcaster}
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
import org.atmosphere.jersey.Broadcastable
class Boot {
@ -104,7 +105,7 @@ class PersistentSimpleService extends Actor {
case object Tick
private val KEY = "COUNTER"
private var hasStartedTicking = false
private val storage = PersistentState.newMap(CassandraStorageConfig())
private val storage = CassandraStorage.newMap
@GET
@Produces(Array("text/html"))
@ -112,11 +113,12 @@ class PersistentSimpleService extends Actor {
def receive = {
case Tick => if (hasStartedTicking) {
val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue
storage.put(KEY, new Integer(counter + 1))
val bytes = storage.get(KEY.getBytes).get
val counter = ByteBuffer.wrap(bytes).getInt
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
reply(<success>Tick:{counter + 1}</success>)
} else {
storage.put(KEY, new Integer(0))
storage.put(KEY.getBytes, Array(0.toByte))
hasStartedTicking = true
reply(<success>Tick: 0</success>)
}

View file

@ -44,7 +44,7 @@ class DigestAuthenticationService extends DigestAuthenticationActor {
//don't forget to configure your standalone Cassandra instance
//
//makeTransactionRequired
//override def mkNonceMap = PersistentState.newMap(CassandraStorageConfig()).asInstanceOf[scala.collection.mutable.Map[String,Long]]
//override def mkNonceMap = Storage.newMap(CassandraStorageConfig()).asInstanceOf[scala.collection.mutable.Map[String,Long]]
//Use an in-memory nonce-map as default
override def mkNonceMap = new scala.collection.mutable.HashMap[String, Long]

View file

@ -27,19 +27,28 @@ import java.lang.{Float => JFloat, Double => JDouble}
object HashCode {
val SEED = 23
def hash(seed: Int, any: Any): Int = any match {
case value: Boolean => hash(seed, value)
case value: Char => hash(seed, value)
case value: Short => hash(seed, value)
case value: Int => hash(seed, value)
case value: Long => hash(seed, value)
case value: Float => hash(seed, value)
case value: Double => hash(seed, value)
case value: Byte => hash(seed, value)
case value: AnyRef =>
var result = seed
if (value == null) result = hash(result, 0)
else if (!isArray(value)) result = hash(result, value.hashCode())
else for (id <- 0 until JArray.getLength(value)) result = hash(result, JArray.get(value, id)) // is an array
result
}
def hash(seed: Int, value: Boolean): Int = firstTerm(seed) + (if (value) 1 else 0)
def hash(seed: Int, value: Char): Int = firstTerm(seed) + value.asInstanceOf[Int]
def hash(seed: Int, value: Int): Int = firstTerm(seed) + value
def hash(seed: Int, value: Long): Int = firstTerm(seed) + (value ^ (value >>> 32) ).asInstanceOf[Int]
def hash(seed: Int, value: Float): Int = hash(seed, JFloat.floatToIntBits(value))
def hash(seed: Int, value: Double): Int = hash(seed, JDouble.doubleToLongBits(value))
def hash(seed: Int, anyRef: AnyRef): Int = {
var result = seed
if (anyRef == null) result = hash(result, 0)
else if (!isArray(anyRef)) result = hash(result, anyRef.hashCode())
else for (id <- 0 until JArray.getLength(anyRef)) result = hash(result, JArray.get(anyRef, id)) // is an array
result
}
private def firstTerm(seed: Int): Int = PRIME * seed
private def isArray(anyRef: AnyRef): Boolean = anyRef.getClass.isArray

Binary file not shown.