diff --git a/akka-camel/src/main/scala/service/ConsumerPublisher.scala b/akka-camel/src/main/scala/service/ConsumerPublisher.scala index dee30882f1..a6509e2694 100644 --- a/akka-camel/src/main/scala/service/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/service/ConsumerPublisher.scala @@ -9,7 +9,7 @@ import java.util.concurrent.CountDownLatch import org.apache.camel.builder.RouteBuilder import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor} -import se.scalablesolutions.akka.annotation.consume +import se.scalablesolutions.akka.actor.annotation.consume import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager} import se.scalablesolutions.akka.util.Logging diff --git a/akka-camel/src/test/scala/service/CamelServiceTest.scala b/akka-camel/src/test/scala/service/CamelServiceTest.scala index 8fafea4687..a3b0f5c913 100644 --- a/akka-camel/src/test/scala/service/CamelServiceTest.scala +++ b/akka-camel/src/test/scala/service/CamelServiceTest.scala @@ -5,7 +5,7 @@ import org.junit.Assert._ import org.scalatest.junit.JUnitSuite import se.scalablesolutions.akka.actor.Actor -import se.scalablesolutions.akka.annotation.consume +import se.scalablesolutions.akka.actor.annotation.consume import se.scalablesolutions.akka.camel.{CamelContextManager, Consumer, Message} import org.junit.{Ignore, Before, After, Test} diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index d88f0e861b..9b5a6b409a 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -19,7 +19,7 @@ import java.net.InetSocketAddress import java.lang.reflect.{InvocationTargetException, Method} object Annotations { - import se.scalablesolutions.akka.annotation._ + import se.scalablesolutions.akka.actor.annotation._ val oneway = classOf[oneway] val transactionrequired = classOf[transactionrequired] val prerestart = classOf[prerestart] diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index 7156c999bc..4a1d6012a7 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -17,41 +17,42 @@ import scala.collection.immutable.{Map, HashMap} * @author Viktor Klang */ trait Cluster { + /** - * Specifies the cluster name - */ + * Specifies the cluster name + */ def name: String /** - * Adds the specified hostname + port as a local node - * This information will be propagated to other nodes in the cluster - * and will be available at the other nodes through lookup and foreach - */ + * Adds the specified hostname + port as a local node + * This information will be propagated to other nodes in the cluster + * and will be available at the other nodes through lookup and foreach + */ def registerLocalNode(hostname: String, port: Int): Unit /** - * Removes the specified hostname + port from the local node - * This information will be propagated to other nodes in the cluster - * and will no longer be available at the other nodes through lookup and foreach - */ + * Removes the specified hostname + port from the local node + * This information will be propagated to other nodes in the cluster + * and will no longer be available at the other nodes through lookup and foreach + */ def deregisterLocalNode(hostname: String, port: Int): Unit /** - * Sends the message to all Actors of the specified type on all other nodes in the cluster - */ + * Sends the message to all Actors of the specified type on all other nodes in the cluster + */ def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit /** - * Traverses all known remote addresses avaiable at all other nodes in the cluster - * and applies the given PartialFunction on the first address that it's defined at - * The order of application is undefined and may vary - */ + * Traverses all known remote addresses avaiable at all other nodes in the cluster + * and applies the given PartialFunction on the first address that it's defined at + * The order of application is undefined and may vary + */ def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] /** - * Applies the specified function to all known remote addresses on al other nodes in the cluster - * The order of application is undefined and may vary - */ + * Applies the specified function to all known remote addresses on al other nodes in the cluster + * The order of application is undefined and may vary + */ def foreach(f: (RemoteAddress) => Unit): Unit } @@ -159,7 +160,7 @@ abstract class BasicClusterActor extends ClusterActor { case RegisterLocalNode(s) => { log debug ("RegisterLocalNode: %s", s) - local = Node(local.endpoints + s) + local = Node(s :: local.endpoints) broadcast(Papers(local.endpoints)) } @@ -242,7 +243,7 @@ object Cluster extends Cluster with Logging { "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer] - serializer setClassLoader loader + serializer.classLoader = Some(loader) try { name map { fqn => diff --git a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala index 287168140a..bfeec1c34e 100644 --- a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala +++ b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala @@ -18,19 +18,17 @@ object RemoteProtocolBuilder { private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf - def setClassLoader(classLoader: ClassLoader) = { - SERIALIZER_JAVA = new Serializer.Java - SERIALIZER_JAVA_JSON = new Serializer.JavaJSON - SERIALIZER_SCALA_JSON = new Serializer.ScalaJSON - SERIALIZER_JAVA.setClassLoader(classLoader) - SERIALIZER_JAVA_JSON.setClassLoader(classLoader) - SERIALIZER_SCALA_JSON.setClassLoader(classLoader) + def setClassLoader(cl: ClassLoader) = { + SERIALIZER_JAVA.classLoader = Some(cl) + SERIALIZER_JAVA_JSON.classLoader = Some(cl) + SERIALIZER_SCALA_JSON.classLoader = Some(cl) } def getMessage(request: RemoteRequest): Any = { request.getProtocol match { case SerializationProtocol.SBINARY => - val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] + val renderer = Class.forName( + new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] renderer.fromBytes(request.getMessage.toByteArray) case SerializationProtocol.SCALA_JSON => val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String] diff --git a/akka-core/src/main/scala/serialization/Serializer.scala b/akka-core/src/main/scala/serialization/Serializer.scala index 1cc930a7eb..c878548711 100644 --- a/akka-core/src/main/scala/serialization/Serializer.scala +++ b/akka-core/src/main/scala/serialization/Serializer.scala @@ -18,13 +18,13 @@ import sjson.json.{Serializer => SJSONSerializer} * @author Jonas Bonér */ trait Serializer { - def deepClone(obj: AnyRef): AnyRef - def out(obj: AnyRef): Array[Byte] - def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef - - protected var classLoader: Option[ClassLoader] = None + var classLoader: Option[ClassLoader] = None - def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) + def deepClone(obj: AnyRef): AnyRef + + def out(obj: AnyRef): Array[Byte] + + def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef } // For Java API @@ -55,7 +55,7 @@ object Serializer { * @author Jonas Bonér */ object Java extends Java - class Java extends Serializer { + trait Java extends Serializer { def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def out(obj: AnyRef): Array[Byte] = { @@ -67,8 +67,9 @@ object Serializer { } def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { - val in = if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) - else new ObjectInputStream(new ByteArrayInputStream(bytes)) + val in = + if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) + else new ObjectInputStream(new ByteArrayInputStream(bytes)) val obj = in.readObject in.close obj @@ -79,18 +80,21 @@ object Serializer { * @author Jonas Bonér */ object Protobuf extends Protobuf - class Protobuf extends Serializer { + trait Protobuf extends Serializer { def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) def out(obj: AnyRef): Array[Byte] = { - if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException("Can't serialize a non-protobuf message using protobuf [" + obj + "]") + if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException( + "Can't serialize a non-protobuf message using protobuf [" + obj + "]") obj.asInstanceOf[Message].toByteArray } def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { - if (!clazz.isDefined) throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf") + if (!clazz.isDefined) throw new IllegalArgumentException( + "Need a protobuf message class to be able to serialize bytes using protobuf") // TODO: should we cache this method lookup? - val message = clazz.get.getDeclaredMethod("getDefaultInstance", EMPTY_CLASS_ARRAY: _*).invoke(null, EMPTY_ANY_REF_ARRAY: _*).asInstanceOf[Message] + val message = clazz.get.getDeclaredMethod( + "getDefaultInstance", EMPTY_CLASS_ARRAY: _*).invoke(null, EMPTY_ANY_REF_ARRAY: _*).asInstanceOf[Message] message.toBuilder().mergeFrom(bytes).build } @@ -104,7 +108,7 @@ object Serializer { * @author Jonas Bonér */ object JavaJSON extends JavaJSON - class JavaJSON extends Serializer { + trait JavaJSON extends Serializer { private val mapper = new ObjectMapper def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) @@ -118,9 +122,11 @@ object Serializer { } def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { - if (!clazz.isDefined) throw new IllegalArgumentException("Can't deserialize JSON to instance if no class is provided") - val in = if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) - else new ObjectInputStream(new ByteArrayInputStream(bytes)) + if (!clazz.isDefined) throw new IllegalArgumentException( + "Can't deserialize JSON to instance if no class is provided") + val in = + if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) + else new ObjectInputStream(new ByteArrayInputStream(bytes)) val obj = mapper.readValue(in, clazz.get).asInstanceOf[AnyRef] in.close obj @@ -136,7 +142,7 @@ object Serializer { * @author Jonas Bonér */ object ScalaJSON extends ScalaJSON - class ScalaJSON extends Serializer { + trait ScalaJSON extends Serializer { def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj) @@ -158,7 +164,7 @@ object Serializer { * @author Jonas Bonér */ object SBinary extends SBinary - class SBinary { + trait SBinary { import sbinary.DefaultProtocol._ def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = in[T](out[T](obj), None) diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala index c74fde4ab0..4f35f1199e 100644 --- a/akka-core/src/main/scala/stm/TransactionalState.scala +++ b/akka-core/src/main/scala/stm/TransactionalState.scala @@ -53,6 +53,17 @@ trait Committable { } /** + * Alias to TransactionalRef. + * + * @author Jonas Bonér + */ +object Ref { + def apply[T]() = new Ref[T] +} + +/** + * Alias to Ref. + * * @author Jonas Bonér */ object TransactionalRef { @@ -65,8 +76,17 @@ object TransactionalRef { def apply[T]() = new TransactionalRef[T] } +/** + * Implements a transactional managed reference. + * Alias to TransactionalRef. + * + * @author Jonas Bonér + */ +class Ref[T] extends TransactionalRef[T] + /** * Implements a transactional managed reference. + * Alias to Ref. * * @author Jonas Bonér */ diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Bar.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Bar.java index 9a3ff80aca..3d85d89a17 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Bar.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Bar.java @@ -1,6 +1,6 @@ package se.scalablesolutions.akka.api; -import se.scalablesolutions.akka.annotation.oneway; +import se.scalablesolutions.akka.actor.annotation.oneway; public interface Bar { @oneway diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Foo.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Foo.java index bb9cfd83d4..962f0b9424 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Foo.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/Foo.java @@ -1,7 +1,7 @@ package se.scalablesolutions.akka.api; import com.google.inject.Inject; -import se.scalablesolutions.akka.annotation.oneway; +import se.scalablesolutions.akka.actor.annotation.oneway; public class Foo extends se.scalablesolutions.akka.serialization.Serializable.JavaJSON { @Inject diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java index 8bc60ad922..afe2f2e232 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java @@ -1,9 +1,9 @@ package se.scalablesolutions.akka.api; -import se.scalablesolutions.akka.annotation.transactionrequired; -import se.scalablesolutions.akka.annotation.prerestart; -import se.scalablesolutions.akka.annotation.postrestart; -import se.scalablesolutions.akka.annotation.inittransactionalstate; +import se.scalablesolutions.akka.actor.annotation.transactionrequired; +import se.scalablesolutions.akka.actor.annotation.prerestart; +import se.scalablesolutions.akka.actor.annotation.postrestart; +import se.scalablesolutions.akka.actor.annotation.inittransactionalstate; import se.scalablesolutions.akka.stm.*; @transactionrequired diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java index ca6b345766..932dc2c162 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java @@ -1,7 +1,7 @@ package se.scalablesolutions.akka.api; -import se.scalablesolutions.akka.annotation.transactionrequired; -import se.scalablesolutions.akka.annotation.inittransactionalstate; +import se.scalablesolutions.akka.actor.annotation.transactionrequired; +import se.scalablesolutions.akka.actor.annotation.inittransactionalstate; import se.scalablesolutions.akka.stm.*; @transactionrequired diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java index caa755ab83..d5c1bdf00c 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java @@ -2,7 +2,7 @@ package se.scalablesolutions.akka.api; import se.scalablesolutions.akka.persistence.common.*; import se.scalablesolutions.akka.persistence.cassandra.*; -import se.scalablesolutions.akka.annotation.inittransactionalstate; +import se.scalablesolutions.akka.actor.annotation.inittransactionalstate; public class PersistentClasher { private PersistentMap state; diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java index 176c551e13..6a8d3353b7 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java @@ -1,7 +1,7 @@ package se.scalablesolutions.akka.api; -import se.scalablesolutions.akka.annotation.inittransactionalstate; -import se.scalablesolutions.akka.annotation.transactionrequired; +import se.scalablesolutions.akka.actor.annotation.inittransactionalstate; +import se.scalablesolutions.akka.actor.annotation.transactionrequired; import se.scalablesolutions.akka.persistence.common.*; import se.scalablesolutions.akka.persistence.cassandra.*; diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java index a919279c0a..bd931ef108 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java @@ -1,7 +1,7 @@ package se.scalablesolutions.akka.api; -import se.scalablesolutions.akka.annotation.inittransactionalstate; -import se.scalablesolutions.akka.annotation.transactionrequired; +import se.scalablesolutions.akka.actor.annotation.inittransactionalstate; +import se.scalablesolutions.akka.actor.annotation.transactionrequired; import se.scalablesolutions.akka.persistence.common.*; import se.scalablesolutions.akka.persistence.cassandra.*; diff --git a/akka-patterns/src/main/scala/Patterns.scala b/akka-patterns/src/main/scala/Patterns.scala index 02f2686ba7..3b7982148e 100644 --- a/akka-patterns/src/main/scala/Patterns.scala +++ b/akka-patterns/src/main/scala/Patterns.scala @@ -18,19 +18,17 @@ object Patterns { /** * Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true */ - def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] = filter( - {case a if a.isInstanceOf[A] => interceptor(a)}, - interceptee - ) + def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] = + filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee) //FIXME 2.8, use default params with CyclicIterator def loadBalancerActor(actors: => InfiniteIterator[Actor]): Actor = new Actor with LoadBalancer { val seq = actors } - def dispatcherActor(routing: PF[Any, Actor], msgTransformer: (Any) => Any): Actor = new Actor with Dispatcher { + def dispatcherActor(routing: PF[Any, Actor], msgTransformer: (Any) => Any): Actor = + new Actor with Dispatcher { override def transform(msg: Any) = msgTransformer(msg) - def routes = routing } @@ -38,36 +36,29 @@ object Patterns { def routes = routing } - def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor = dispatcherActor( - {case _ => actorToLog}, - logger - ) + def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor = + dispatcherActor({case _ => actorToLog}, logger) } -trait Dispatcher { - self: Actor => +trait Dispatcher { self: Actor => protected def transform(msg: Any): Any = msg protected def routes: PartialFunction[Any, Actor] protected def dispatch: PartialFunction[Any, Unit] = { - case a if routes.isDefinedAt(a) => { - if (self.sender.isDefined) - routes(a) forward transform(a) - else - routes(a) send transform(a) - } + case a if routes.isDefinedAt(a) => + if (self.sender.isDefined) routes(a) forward transform(a) + else routes(a) send transform(a) } def receive = dispatch } -trait LoadBalancer extends Dispatcher { - self: Actor => +trait LoadBalancer extends Dispatcher { self: Actor => protected def seq: InfiniteIterator[Actor] - protected def routes = {case x if seq.hasNext => seq.next} + protected def routes = { case x if seq.hasNext => seq.next } } trait InfiniteIterator[T] extends Iterator[T] diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index ff37dde82e..0f0eeac912 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -51,18 +51,24 @@ trait Storage { def newRef: PersistentRef[ElementType] def newQueue: PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def newSortedSet: PersistentSortedSet[ElementType] = // only implemented for redis + throw new UnsupportedOperationException def getMap(id: String): PersistentMap[ElementType, ElementType] def getVector(id: String): PersistentVector[ElementType] def getRef(id: String): PersistentRef[ElementType] def getQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def getSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis + throw new UnsupportedOperationException def newMap(id: String): PersistentMap[ElementType, ElementType] def newVector(id: String): PersistentVector[ElementType] def newRef(id: String): PersistentRef[ElementType] def newQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def newSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis + throw new UnsupportedOperationException } /** @@ -302,7 +308,7 @@ trait PersistentRef[T] extends Transactional with Committable { trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] with Transactional with Committable with Logging { - abstract case class QueueOp + sealed trait QueueOp case object ENQ extends QueueOp case object DEQ extends QueueOp @@ -398,3 +404,119 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] transaction.get.get.register(uuid, this) } } + +/** + * Implements a template for a concrete persistent transactional sorted set based storage. + *
+ * Sorting is done based on a zscore. But the computation of zscore has been kept + * outside the abstraction. + * + * zscore can be implemented in a variety of ways by the calling class: + *
+ * trait ZScorable {
+ * def toZScore: Float
+ * }
+ *
+ * class Foo extends ZScorable {
+ * //.. implemnetation
+ * }
+ *
+ * Or we can also use views:
+ *
+ * class Foo {
+ * //..
+ * }
+ *
+ * implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
+ * def toZScore = {
+ * //..
+ * }
+ * }
+ *
+ *
+ * and use foo.toZScore to compute the zscore and pass to the APIs.
+ *
+ * @author
+ */
+trait PersistentSortedSet[A]
+ extends Transactional
+ with Committable {
+
+ protected val newElems = TransactionalState.newMap[A, Float]
+ protected val removedElems = TransactionalState.newVector[A]
+
+ val storage: SortedSetStorageBackend[A]
+
+ def commit = {
+ for ((element, score) <- newElems) storage.zadd(uuid, String.valueOf(score), element)
+ for (element <- removedElems) storage.zrem(uuid, element)
+ newElems.clear
+ removedElems.clear
+ }
+
+ def +(elem: A, score: Float) = add(elem, score)
+
+ def add(elem: A, score: Float) = {
+ register
+ newElems.put(elem, score)
+ }
+
+ def -(elem: A) = remove(elem)
+
+ def remove(elem: A) = {
+ register
+ removedElems.add(elem)
+ }
+
+ private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match {
+ case Some(s) => Some(s.toFloat)
+ case None => None
+ }
+
+ def contains(elem: A): Boolean = {
+ if (newElems contains elem) true
+ else {
+ inStorage(elem) match {
+ case Some(f) => true
+ case None => false
+ }
+ }
+ }
+
+ def size: Int = newElems.size + storage.zcard(uuid) - removedElems.size
+
+ def zscore(elem: A): Float = {
+ if (newElems contains elem) newElems.get(elem).get
+ inStorage(elem) match {
+ case Some(f) => f
+ case None =>
+ throw new Predef.NoSuchElementException(elem + " not present")
+ }
+ }
+
+ implicit def order(x: (A, Float)) = new Ordered[(A, Float)] {
+ def compare(that: (A, Float)) = x._2 compare that._2
+ }
+
+ def zrange(start: Int, end: Int): List[(A, Float)] = {
+ // need to operate on the whole range
+ // get all from the underlying storage
+ val fromStore = storage.zrangeWithScore(uuid, 0, -1)
+ val ts = scala.collection.immutable.TreeSet(fromStore: _*) ++ newElems.toList
+ val l = ts.size
+
+ // -1 means the last element, -2 means the second last
+ val s = if (start < 0) start + l else start
+ val e =
+ if (end < 0) end + l
+ else if (end >= l) (l - 1)
+ else end
+ // slice is open at the end, we need a closed end range
+ ts.elements.slice(s, e + 1).toList
+ }
+
+ private def register = {
+ if (transaction.get.isEmpty) throw new NoTransactionInScopeException
+ transaction.get.get.register(uuid, this)
+ }
+}
diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala
index d909f0e4a4..ab0cfaf4d3 100644
--- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala
+++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala
@@ -69,11 +69,15 @@ trait SortedSetStorageBackend[T] extends StorageBackend {
// remove item from sorted set identified by name
def zrem(name: String, item: T): Boolean
- // cardinality of the set idnetified by name
+ // cardinality of the set identified by name
def zcard(name: String): Int
- def zscore(name: String, item: T): String
+ // zscore of the item from sorted set identified by name
+ def zscore(name: String, item: T): Option[Float]
+ // zrange from the sorted set identified by name
def zrange(name: String, start: Int, end: Int): List[T]
-}
+ // zrange with score from the sorted set identified by name
+ def zrangeWithScore(name: String, start: Int, end: Int): List[(T, Float)]
+}
diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala
index 1338a9f8d4..b8aada0572 100644
--- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala
+++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala
@@ -15,16 +15,20 @@ object RedisStorage extends Storage {
def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString)
def newRef: PersistentRef[ElementType] = newRef(UUID.newUuid.toString)
override def newQueue: PersistentQueue[ElementType] = newQueue(UUID.newUuid.toString)
+ override def newSortedSet: PersistentSortedSet[ElementType] = newSortedSet(UUID.newUuid.toString)
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id)
+ override def getSortedSet(id: String): PersistentSortedSet[ElementType] = newSortedSet(id)
def newMap(id: String): PersistentMap[ElementType, ElementType] = new RedisPersistentMap(id)
def newVector(id: String): PersistentVector[ElementType] = new RedisPersistentVector(id)
def newRef(id: String): PersistentRef[ElementType] = new RedisPersistentRef(id)
override def newQueue(id: String): PersistentQueue[ElementType] = new RedisPersistentQueue(id)
+ override def newSortedSet(id: String): PersistentSortedSet[ElementType] =
+ new RedisPersistentSortedSet(id)
}
/**
@@ -63,3 +67,14 @@ class RedisPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
val uuid = id
val storage = RedisStorageBackend
}
+
+/**
+ * Implements a persistent transactional sorted set based on the Redis
+ * storage.
+ *
+ * @author Debasish Ghosh
+ */
+class RedisPersistentSortedSet(id: String) extends PersistentSortedSet[Array[Byte]] {
+ val uuid = id
+ val storage = RedisStorageBackend
+}
diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
index 8bca9c6af6..c48c84fa39 100644
--- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
+++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
@@ -364,11 +364,10 @@ private [akka] object RedisStorageBackend extends
}
}
- def zscore(name: String, item: Array[Byte]): String = withErrorHandling {
+ def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling {
db.zscore(new String(encode(name.getBytes)), new String(item)) match {
- case None =>
- throw new Predef.NoSuchElementException(new String(item) + " not present")
- case Some(s) => s
+ case Some(s) => Some(s.toFloat)
+ case None => None
}
}
@@ -380,6 +379,16 @@ private [akka] object RedisStorageBackend extends
s.map(_.get.getBytes)
}
}
+
+ def zrangeWithScore(name: String, start: Int, end: Int): List[(Array[Byte], Float)] = withErrorHandling {
+ db.zrangeWithScore(
+ new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC) match {
+ case None =>
+ throw new Predef.NoSuchElementException(name + " not present")
+ case Some(l) =>
+ l.map{ case (elem, score) => (elem.get.getBytes, score.get.toFloat) }
+ }
+ }
def flushDB = withErrorHandling(db.flushDb)
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
index 9405789bfd..41ee6fb909 100644
--- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
@@ -87,7 +87,7 @@ class AccountActor extends Transactor {
}
@serializable class PersistentFailerActor extends Transactor {
- //timeout = 5000
+ // timeout = 5000
def receive = {
case "Failure" =>
throw new RuntimeException("expected")
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala
new file mode 100644
index 0000000000..a2abb2cd40
--- /dev/null
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala
@@ -0,0 +1,237 @@
+package se.scalablesolutions.akka.persistence.redis
+
+import org.scalatest.Spec
+import org.scalatest.Assertions
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import se.scalablesolutions.akka.actor.{Actor, Transactor}
+
+/**
+ * A persistent actor based on Redis sortedset storage.
+ *
+ * Needs a running Redis server.
+ * @author Debasish Ghosh
+ */
+
+trait ZScorable {
+ def zscore: Float
+}
+
+case class Hacker(name: String, birth: String) extends ZScorable {
+ def zscore = birth.toFloat
+}
+
+class SetThresholdViolationException extends RuntimeException
+
+// add hacker to the set
+case class ADD(h: Hacker)
+
+// remove hacker from set
+case class REMOVE(h: Hacker)
+
+// size of the set
+case object SIZE
+
+// zscore of the hacker
+case class SCORE(h: Hacker)
+
+// zrange
+case class RANGE(start: Int, end: Int)
+
+// add and remove subject to the condition that there will be at least 3 hackers
+case class MULTI(add: List[Hacker], rem: List[Hacker], failer: Actor)
+
+class SortedSetActor extends Transactor {
+ timeout = 100000
+ private lazy val hackers = RedisStorage.newSortedSet
+
+ def receive = {
+ case ADD(h) =>
+ hackers.+(h.name.getBytes, h.zscore)
+ reply(true)
+
+ case REMOVE(h) =>
+ hackers.-(h.name.getBytes)
+ reply(true)
+
+ case SIZE =>
+ reply(hackers.size)
+
+ case SCORE(h) =>
+ reply(hackers.zscore(h.name.getBytes))
+
+ case RANGE(s, e) =>
+ reply(hackers.zrange(s, e))
+
+ case MULTI(a, r, failer) =>
+ a.foreach{ h: Hacker =>
+ hackers.+(h.name.getBytes, h.zscore)
+ }
+ try {
+ r.foreach{ h =>
+ if (hackers.size <= 3)
+ throw new SetThresholdViolationException
+ hackers.-(h.name.getBytes)
+ }
+ } catch {
+ case e: Exception =>
+ failer !! "Failure"
+ }
+ reply((a.size, r.size))
+ }
+}
+
+import RedisStorageBackend._
+
+@RunWith(classOf[JUnitRunner])
+class RedisPersistentSortedSetSpec extends
+ Spec with
+ ShouldMatchers with
+ BeforeAndAfterAll {
+
+ override def beforeAll {
+ flushDB
+ println("** destroyed database")
+ }
+
+ override def afterAll {
+ flushDB
+ println("** destroyed database")
+ }
+
+ val h1 = Hacker("Alan kay", "1940")
+ val h2 = Hacker("Richard Stallman", "1953")
+ val h3 = Hacker("Yukihiro Matsumoto", "1965")
+ val h4 = Hacker("Claude Shannon", "1916")
+ val h5 = Hacker("Linus Torvalds", "1969")
+ val h6 = Hacker("Alan Turing", "1912")
+
+ describe("Add and report cardinality of the set") {
+ val qa = new SortedSetActor
+ qa.start
+
+ it("should enter 6 hackers") {
+ qa !! ADD(h1)
+ qa !! ADD(h2)
+ qa !! ADD(h3)
+ qa !! ADD(h4)
+ qa !! ADD(h5)
+ qa !! ADD(h6)
+ (qa !! SIZE).get.asInstanceOf[Int] should equal(6)
+ }
+
+ it("should fetch correct scores for hackers") {
+ (qa !! SCORE(h1)).get.asInstanceOf[Float] should equal(1940.0f)
+ (qa !! SCORE(h5)).get.asInstanceOf[Float] should equal(1969.0f)
+ (qa !! SCORE(h6)).get.asInstanceOf[Float] should equal(1912.0f)
+ }
+
+ it("should fetch proper range") {
+ (qa !! RANGE(0, 4)).get.asInstanceOf[List[_]].size should equal(5)
+ (qa !! RANGE(0, 6)).get.asInstanceOf[List[_]].size should equal(6)
+ }
+
+ it("should remove and throw exception for removing non-existent hackers") {
+ qa !! REMOVE(h2)
+ (qa !! SIZE).get.asInstanceOf[Int] should equal(5)
+ qa !! REMOVE(h3)
+ (qa !! SIZE).get.asInstanceOf[Int] should equal(4)
+ val h7 = Hacker("Paul Snively", "1952")
+ try {
+ qa !! REMOVE(h7)
+ }
+ catch {
+ case e: Predef.NoSuchElementException =>
+ e.getMessage should endWith("not present")
+ }
+ }
+
+ it("should change score for entering the same hacker name with diff score") {
+ (qa !! SIZE).get.asInstanceOf[Int] should equal(4)
+
+ // same name as h6
+ val h7 = Hacker("Alan Turing", "1992")
+ qa !! ADD(h7)
+
+ // size remains same
+ (qa !! SIZE).get.asInstanceOf[Int] should equal(4)
+
+ // score updated
+ (qa !! SCORE(h7)).get.asInstanceOf[Float] should equal(1992.0f)
+ }
+ }
+
+ describe("Transaction semantics") {
+ it("should rollback on exception") {
+ val qa = new SortedSetActor
+ qa.start
+
+ val failer = new PersistentFailerActor
+ failer.start
+
+ (qa !! SIZE).get.asInstanceOf[Int] should equal(0)
+ val add = List(h1, h2, h3, h4)
+ val rem = List(h2)
+ (qa !! MULTI(add, rem, failer)).get.asInstanceOf[Tuple2[Int, Int]] should equal((4,1))
+ (qa !! SIZE).get.asInstanceOf[Int] should equal(3)
+ // size == 3
+
+ // add 2 more
+ val add1 = List(h5, h6)
+
+ // remove 3
+ val rem1 = List(h1, h3, h4)
+ try {
+ qa !! MULTI(add1, rem1, failer)
+ } catch { case e: Exception => {}
+ }
+ (qa !! SIZE).get.asInstanceOf[Int] should equal(3)
+ }
+ }
+
+ describe("zrange") {
+ it ("should report proper range") {
+ val qa = new SortedSetActor
+ qa.start
+ qa !! ADD(h1)
+ qa !! ADD(h2)
+ qa !! ADD(h3)
+ qa !! ADD(h4)
+ qa !! ADD(h5)
+ qa !! ADD(h6)
+ (qa !! SIZE).get.asInstanceOf[Int] should equal(6)
+ val l = (qa !! RANGE(0, 6)).get.asInstanceOf[List[(Array[Byte], Float)]]
+ l.map { case (e, s) => (new String(e), s) }.head should equal(("Alan Turing", 1912.0f))
+ val h7 = Hacker("Alan Turing", "1992")
+ qa !! ADD(h7)
+ (qa !! SIZE).get.asInstanceOf[Int] should equal(6)
+ val m = (qa !! RANGE(0, 6)).get.asInstanceOf[List[(Array[Byte], Float)]]
+ m.map { case (e, s) => (new String(e), s) }.head should equal(("Claude Shannon", 1916.0f))
+ }
+
+ it ("should report proper rge") {
+ val qa = new SortedSetActor
+ qa.start
+ qa !! ADD(h1)
+ qa !! ADD(h2)
+ qa !! ADD(h3)
+ qa !! ADD(h4)
+ qa !! ADD(h5)
+ qa !! ADD(h6)
+ (qa !! SIZE).get.asInstanceOf[Int] should equal(6)
+ (qa !! RANGE(0, 5)).get.asInstanceOf[List[_]].size should equal(6)
+ (qa !! RANGE(0, 6)).get.asInstanceOf[List[_]].size should equal(6)
+ (qa !! RANGE(0, 3)).get.asInstanceOf[List[_]].size should equal(4)
+ (qa !! RANGE(0, 1)).get.asInstanceOf[List[_]].size should equal(2)
+ (qa !! RANGE(0, 0)).get.asInstanceOf[List[_]].size should equal(1)
+ (qa !! RANGE(3, 1)).get.asInstanceOf[List[_]].size should equal(0)
+ (qa !! RANGE(0, -1)).get.asInstanceOf[List[_]].size should equal(6)
+ (qa !! RANGE(0, -2)).get.asInstanceOf[List[_]].size should equal(5)
+ (qa !! RANGE(0, -4)).get.asInstanceOf[List[_]].size should equal(3)
+ (qa !! RANGE(-4, -1)).get.asInstanceOf[List[_]].size should equal(4)
+ }
+ }
+}
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala
index cfe704c6ba..44081a43c6 100644
--- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala
@@ -191,10 +191,10 @@ class RedisStorageBackendSpec extends
zcard("hackers") should equal(6)
- zscore("hackers", "alan turing".getBytes) should equal("1912")
- zscore("hackers", "richard stallman".getBytes) should equal("1953")
- zscore("hackers", "claude shannon".getBytes) should equal("1916")
- zscore("hackers", "linus torvalds".getBytes) should equal("1969")
+ zscore("hackers", "alan turing".getBytes).get should equal(1912.0f)
+ zscore("hackers", "richard stallman".getBytes).get should equal(1953.0f)
+ zscore("hackers", "claude shannon".getBytes).get should equal(1916.0f)
+ zscore("hackers", "linus torvalds".getBytes).get should equal(1969.0f)
val s: List[Array[Byte]] = zrange("hackers", 0, 2)
s.size should equal(3)
@@ -206,6 +206,10 @@ class RedisStorageBackendSpec extends
val t: List[Array[Byte]] = zrange("hackers", 0, -1)
t.size should equal(6)
t.map(new String(_)) should equal(sorted)
+
+ val u: List[(Array[Byte], Float)] = zrangeWithScore("hackers", 0, -1)
+ u.size should equal(6)
+ u.map{ case (e, s) => new String(e) } should equal(sorted)
}
}
}
diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala
index 11367dedb4..c82b29afc9 100644
--- a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala
+++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala
@@ -1,7 +1,7 @@
package sample.camel
import se.scalablesolutions.akka.actor.{Actor, RemoteActor}
-import se.scalablesolutions.akka.annotation.consume
+import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.{Producer, Message, Consumer}
import se.scalablesolutions.akka.util.Logging
diff --git a/akka-samples/akka-sample-chat/README b/akka-samples/akka-sample-chat/README
index d2049cd7c0..88720d8c55 100644
--- a/akka-samples/akka-sample-chat/README
+++ b/akka-samples/akka-sample-chat/README
@@ -10,19 +10,22 @@ For details on how to set up Redis server have a look at http://code.google.com/
Then to run the sample:
-1. Set ‘AKKA_HOME’ environment variable to the root of the Akka distribution.
-2. Open up a shell and step into the Akka distribution root folder.
-3. Build Akka by invoking ‘mvn install -Dmaven.test.skip=true’. This will also build the sample application and deploy it to the ‘$AKKA_HOME/deploy’ directory.
-4. Run the microkernel
- export AKKA_HOME=...
- cd $AKKA_HOME
- java -jar ./dist/akka-0.6.jar
-5. Now start up a new shell and go down into the ‘./akka-samples/akka-sample-chat’ directory.
-6. Invoke ‘mvn scala:console -o’. This will give you a Scala REPL (interpreter) with the chat application and all its dependency JARs on the classpath.
-7. Simply paste in the whole code block with the ‘Runner’ object above and invoke ‘Runner.run’. This runs a simulated client session that will connect to the running server in the microkernel.
-8. Invoke ‘Runner.run’ again and again…
+1. Install the Redis network storage. Download it from [http://code.google.com/p/redis/].
+2. Open up a shell and start up an instance of Redis.
+3. Fire up two shells. For each of them:
+ - Step down into to the root of the Akka distribution.
+ - Set 'export AKKA_HOME=