Redis tests now passes with new STM + misc minor changes to Cluster
This commit is contained in:
parent
c3fef4e1bc
commit
dc884020f3
11 changed files with 131 additions and 96 deletions
|
|
@ -309,9 +309,9 @@ trait Actor extends TransactionManagement {
|
||||||
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
|
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
|
||||||
* Can be one of:
|
* Can be one of:
|
||||||
* <pre/>
|
* <pre/>
|
||||||
* AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
|
* faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
|
||||||
*
|
*
|
||||||
* OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
|
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
protected var faultHandler: Option[FaultHandlingStrategy] = None
|
protected var faultHandler: Option[FaultHandlingStrategy] = None
|
||||||
|
|
|
||||||
|
|
@ -31,10 +31,10 @@ trait BootableActorLoaderService extends Bootable with Logging {
|
||||||
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
|
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
|
||||||
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
|
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
|
||||||
new URLClassLoader(toDeploy.toArray, ClassLoader.getSystemClassLoader)
|
new URLClassLoader(toDeploy.toArray, ClassLoader.getSystemClassLoader)
|
||||||
} else if (getClass.getClassLoader.getResourceAsStream("akka.conf") ne null) {
|
} else if (getClass.getClassLoader.getResourceAsStream("aop.xml") ne null) {
|
||||||
getClass.getClassLoader
|
getClass.getClassLoader
|
||||||
} else throw new IllegalStateException(
|
} else throw new IllegalStateException(
|
||||||
"AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
|
"AKKA_HOME is not defined and akka-<version>.jar can not be found on the classpath; aborting...")
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ trait BootableRemoteActorService extends Bootable with Logging {
|
||||||
|
|
||||||
abstract override def onLoad = {
|
abstract override def onLoad = {
|
||||||
if(config.getBool("akka.remote.server.service", true)){
|
if(config.getBool("akka.remote.server.service", true)){
|
||||||
log.info("Starting up Cluster Service")
|
|
||||||
Cluster.start
|
Cluster.start
|
||||||
super.onLoad //Initialize BootableActorLoaderService before remote service
|
super.onLoad //Initialize BootableActorLoaderService before remote service
|
||||||
log.info("Initializing Remote Actors Service...")
|
log.info("Initializing Remote Actors Service...")
|
||||||
|
|
|
||||||
|
|
@ -60,19 +60,18 @@ private[remote] object ClusterActor {
|
||||||
abstract class BasicClusterActor extends ClusterActor {
|
abstract class BasicClusterActor extends ClusterActor {
|
||||||
import ClusterActor._
|
import ClusterActor._
|
||||||
|
|
||||||
case class Message(sender : ADDR_T,msg : Array[Byte])
|
case class Message(sender : ADDR_T, msg : Array[Byte])
|
||||||
case object PapersPlease extends ClusterMessage
|
case object PapersPlease extends ClusterMessage
|
||||||
case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage
|
case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage
|
||||||
case object Block extends ClusterMessage
|
case object Block extends ClusterMessage
|
||||||
case object Unblock extends ClusterMessage
|
case object Unblock extends ClusterMessage
|
||||||
case class View(othersPresent : Set[ADDR_T]) extends ClusterMessage
|
case class View(othersPresent: Set[ADDR_T]) extends ClusterMessage
|
||||||
case class Zombie(address: ADDR_T) extends ClusterMessage
|
case class Zombie(address: ADDR_T) extends ClusterMessage
|
||||||
case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage
|
case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage
|
||||||
case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage
|
case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage
|
||||||
|
|
||||||
type ADDR_T
|
type ADDR_T
|
||||||
|
|
||||||
|
|
||||||
@volatile private var local: Node = Node(Nil)
|
@volatile private var local: Node = Node(Nil)
|
||||||
@volatile private var remotes: Map[ADDR_T, Node] = Map()
|
@volatile private var remotes: Map[ADDR_T, Node] = Map()
|
||||||
|
|
||||||
|
|
@ -206,17 +205,21 @@ abstract class BasicClusterActor extends ClusterActor {
|
||||||
* Loads a specified ClusterActor and delegates to that instance.
|
* Loads a specified ClusterActor and delegates to that instance.
|
||||||
*/
|
*/
|
||||||
object Cluster extends Cluster with Logging {
|
object Cluster extends Cluster with Logging {
|
||||||
|
lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName
|
||||||
|
|
||||||
@volatile private[remote] var clusterActor: Option[ClusterActor] = None
|
@volatile private[remote] var clusterActor: Option[ClusterActor] = None
|
||||||
@volatile private[remote] var supervisor: Option[Supervisor] = None
|
@volatile private[remote] var supervisor: Option[Supervisor] = None
|
||||||
|
|
||||||
|
// FIXME Use the supervisor member field
|
||||||
|
|
||||||
private[remote] lazy val serializer: Serializer = {
|
private[remote] lazy val serializer: Serializer =
|
||||||
val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName)
|
Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
|
||||||
Class.forName(className).newInstance.asInstanceOf[Serializer]
|
.newInstance.asInstanceOf[Serializer]
|
||||||
}
|
|
||||||
|
|
||||||
private[remote] def createClusterActor : Option[ClusterActor] = {
|
private[remote] def createClusterActor: Option[ClusterActor] = {
|
||||||
val name = config.getString("akka.remote.cluster.actor")
|
val name = config.getString("akka.remote.cluster.actor")
|
||||||
|
if (name.isEmpty) throw new IllegalArgumentException(
|
||||||
|
"Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
|
||||||
try {
|
try {
|
||||||
name map { fqn =>
|
name map { fqn =>
|
||||||
val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
|
val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
|
||||||
|
|
@ -225,7 +228,7 @@ object Cluster extends Cluster with Logging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
case e => log.error(e,"Couldn't load Cluster provider: [%s]",name.getOrElse("Not specified")); None
|
case e => log.error(e, "Couldn't load Cluster provider: [%s]", name.getOrElse("Not specified")); None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -250,10 +253,11 @@ object Cluster extends Cluster with Logging {
|
||||||
|
|
||||||
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg))
|
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg))
|
||||||
|
|
||||||
def foreach(f : (RemoteAddress) => Unit) : Unit = clusterActor.foreach(_.foreach(f))
|
def foreach(f: (RemoteAddress) => Unit) : Unit = clusterActor.foreach(_.foreach(f))
|
||||||
|
|
||||||
def start : Unit = synchronized {
|
def start: Unit = synchronized {
|
||||||
if(supervisor.isEmpty) {
|
log.info("Starting up Cluster Service...")
|
||||||
|
if (supervisor.isEmpty) {
|
||||||
for(actor <- createClusterActor;
|
for(actor <- createClusterActor;
|
||||||
sup <- createSupervisor(actor)) {
|
sup <- createSupervisor(actor)) {
|
||||||
clusterActor = Some(actor)
|
clusterActor = Some(actor)
|
||||||
|
|
@ -262,7 +266,8 @@ object Cluster extends Cluster with Logging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def shutdown : Unit = synchronized {
|
def shutdown: Unit = synchronized {
|
||||||
|
log.info("Shutting down Cluster Service...")
|
||||||
supervisor.foreach(_.stop)
|
supervisor.foreach(_.stop)
|
||||||
supervisor = None
|
supervisor = None
|
||||||
clusterActor = None
|
clusterActor = None
|
||||||
|
|
|
||||||
|
|
@ -79,7 +79,7 @@ object Kernel extends Logging {
|
||||||
(____ /__|_ \__|_ \(____ /
|
(____ /__|_ \__|_ \(____ /
|
||||||
\/ \/ \/ \/
|
\/ \/ \/ \/
|
||||||
""")
|
""")
|
||||||
log.info(" Running version %s", Config.VERSION)
|
log.info(" Running version %s", Config.VERSION)
|
||||||
log.info("==============================")
|
log.info("==============================")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,14 +3,14 @@ package se.scalablesolutions.akka.actor.patterns
|
||||||
import se.scalablesolutions.akka.actor.Actor
|
import se.scalablesolutions.akka.actor.Actor
|
||||||
|
|
||||||
object Patterns {
|
object Patterns {
|
||||||
type PF[A,B] = PartialFunction[A,B]
|
type PF[A, B] = PartialFunction[A, B]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new PartialFunction whose isDefinedAt is a combination
|
* Creates a new PartialFunction whose isDefinedAt is a combination
|
||||||
* of the two parameters, and whose apply is first to call filter.apply and then filtered.apply
|
* of the two parameters, and whose apply is first to call filter.apply and then filtered.apply
|
||||||
*/
|
*/
|
||||||
def filter[A,B](filter : PF[A,Unit],filtered : PF[A,B]) : PF[A,B] = {
|
def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = {
|
||||||
case a : A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) =>
|
case a: A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) =>
|
||||||
filter(a)
|
filter(a)
|
||||||
filtered(a)
|
filtered(a)
|
||||||
}
|
}
|
||||||
|
|
@ -18,39 +18,42 @@ object Patterns {
|
||||||
/**
|
/**
|
||||||
* Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true
|
* Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true
|
||||||
*/
|
*/
|
||||||
def intercept[A,B](interceptor : (A) => Unit, interceptee : PF[A,B]) : PF[A,B] = filter(
|
def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] = filter(
|
||||||
{ case a if a.isInstanceOf[A] => interceptor(a) },
|
{case a if a.isInstanceOf[A] => interceptor(a)},
|
||||||
interceptee
|
interceptee
|
||||||
)
|
)
|
||||||
|
|
||||||
//FIXME 2.8, use default params with CyclicIterator
|
//FIXME 2.8, use default params with CyclicIterator
|
||||||
def loadBalancerActor(actors : => InfiniteIterator[Actor]) : Actor = new Actor with LoadBalancer {
|
def loadBalancerActor(actors: => InfiniteIterator[Actor]): Actor = new Actor with LoadBalancer {
|
||||||
val seq = actors
|
val seq = actors
|
||||||
}
|
}
|
||||||
|
|
||||||
def dispatcherActor(routing : PF[Any,Actor], msgTransformer : (Any) => Any) : Actor = new Actor with Dispatcher {
|
def dispatcherActor(routing: PF[Any, Actor], msgTransformer: (Any) => Any): Actor = new Actor with Dispatcher {
|
||||||
override def transform(msg : Any) = msgTransformer(msg)
|
override def transform(msg: Any) = msgTransformer(msg)
|
||||||
|
|
||||||
def routes = routing
|
def routes = routing
|
||||||
}
|
}
|
||||||
|
|
||||||
def dispatcherActor(routing : PF[Any,Actor]) : Actor = new Actor with Dispatcher {
|
def dispatcherActor(routing: PF[Any, Actor]): Actor = new Actor with Dispatcher {
|
||||||
def routes = routing
|
def routes = routing
|
||||||
}
|
}
|
||||||
|
|
||||||
def loggerActor(actorToLog : Actor, logger : (Any) => Unit) : Actor = dispatcherActor (
|
def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor = dispatcherActor(
|
||||||
{ case _ => actorToLog },
|
{case _ => actorToLog},
|
||||||
logger
|
logger
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
trait Dispatcher { self : Actor =>
|
trait Dispatcher {
|
||||||
|
self: Actor =>
|
||||||
|
|
||||||
protected def transform(msg : Any) : Any = msg
|
protected def transform(msg: Any): Any = msg
|
||||||
protected def routes : PartialFunction[Any,Actor]
|
|
||||||
|
protected def routes: PartialFunction[Any, Actor]
|
||||||
protected def dispatch : PartialFunction[Any,Unit] = {
|
|
||||||
|
protected def dispatch: PartialFunction[Any, Unit] = {
|
||||||
case a if routes.isDefinedAt(a) => {
|
case a if routes.isDefinedAt(a) => {
|
||||||
if(self.sender.isDefined)
|
if (self.sender.isDefined)
|
||||||
routes(a) forward transform(a)
|
routes(a) forward transform(a)
|
||||||
else
|
else
|
||||||
routes(a) send transform(a)
|
routes(a) send transform(a)
|
||||||
|
|
@ -60,19 +63,22 @@ trait Dispatcher { self : Actor =>
|
||||||
def receive = dispatch
|
def receive = dispatch
|
||||||
}
|
}
|
||||||
|
|
||||||
trait LoadBalancer extends Dispatcher { self : Actor =>
|
trait LoadBalancer extends Dispatcher {
|
||||||
protected def seq : InfiniteIterator[Actor]
|
self: Actor =>
|
||||||
|
protected def seq: InfiniteIterator[Actor]
|
||||||
|
|
||||||
protected def routes = { case x if seq.hasNext => seq.next }
|
protected def routes = {case x if seq.hasNext => seq.next}
|
||||||
}
|
}
|
||||||
|
|
||||||
trait InfiniteIterator[T] extends Iterator[T]
|
trait InfiniteIterator[T] extends Iterator[T]
|
||||||
|
|
||||||
class CyclicIterator[T](items : List[T]) extends InfiniteIterator[T] {
|
class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
|
||||||
@volatile private[this] var current : List[T] = items
|
@volatile private[this] var current: List[T] = items
|
||||||
|
|
||||||
def hasNext = items != Nil
|
def hasNext = items != Nil
|
||||||
|
|
||||||
def next = {
|
def next = {
|
||||||
val nc = if(current == Nil) items else current
|
val nc = if (current == Nil) items else current
|
||||||
current = nc.tail
|
current = nc.tail
|
||||||
nc.head
|
nc.head
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,10 +8,11 @@ import se.scalablesolutions.akka.stm.TransactionManagement.transaction
|
||||||
import se.scalablesolutions.akka.collection._
|
import se.scalablesolutions.akka.collection._
|
||||||
import se.scalablesolutions.akka.util.Logging
|
import se.scalablesolutions.akka.util.Logging
|
||||||
|
|
||||||
import org.codehaus.aspectwerkz.proxy.Uuid
|
// FIXME move to 'stm' package + add message with more info
|
||||||
|
|
||||||
class NoTransactionInScopeException extends RuntimeException
|
class NoTransactionInScopeException extends RuntimeException
|
||||||
|
|
||||||
|
class StorageException(message: String) extends RuntimeException(message)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Example Scala usage.
|
* Example Scala usage.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
|
|
||||||
|
|
@ -72,11 +72,11 @@ private [akka] object RedisStorageBackend extends
|
||||||
* base64(T1):base64("debasish.programming_language") -> "scala"
|
* base64(T1):base64("debasish.programming_language") -> "scala"
|
||||||
* </i>
|
* </i>
|
||||||
*/
|
*/
|
||||||
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) {
|
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]): Unit = withErrorHandling {
|
||||||
insertMapStorageEntriesFor(name, List((key, value)))
|
insertMapStorageEntriesFor(name, List((key, value)))
|
||||||
}
|
}
|
||||||
|
|
||||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) {
|
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]): Unit = withErrorHandling {
|
||||||
mset(entries.map(e =>
|
mset(entries.map(e =>
|
||||||
(makeRedisKey(name, e._1), new String(e._2))))
|
(makeRedisKey(name, e._1), new String(e._2))))
|
||||||
}
|
}
|
||||||
|
|
@ -89,22 +89,22 @@ private [akka] object RedisStorageBackend extends
|
||||||
* <li>: is chosen since it cannot appear in base64 encoding charset</li>
|
* <li>: is chosen since it cannot appear in base64 encoding charset</li>
|
||||||
* <li>both parts of the key need to be based64 encoded since there can be spaces within each of them</li>
|
* <li>both parts of the key need to be based64 encoded since there can be spaces within each of them</li>
|
||||||
*/
|
*/
|
||||||
private [this] def makeRedisKey(name: String, key: Array[Byte]): String = {
|
private [this] def makeRedisKey(name: String, key: Array[Byte]): String = withErrorHandling {
|
||||||
"%s:%s".format(new String(encode(name.getBytes)), new String(encode(key)))
|
"%s:%s".format(new String(encode(name.getBytes)), new String(encode(key)))
|
||||||
}
|
}
|
||||||
|
|
||||||
private [this] def makeKeyFromRedisKey(redisKey: String) = {
|
private [this] def makeKeyFromRedisKey(redisKey: String) = withErrorHandling {
|
||||||
val nk = redisKey.split(':').map{e: String => decode(e.getBytes)}
|
val nk = redisKey.split(':').map{e: String => decode(e.getBytes)}
|
||||||
(nk(0), nk(1))
|
(nk(0), nk(1))
|
||||||
}
|
}
|
||||||
|
|
||||||
private [this] def mset(entries: List[(String, String)]) {
|
private [this] def mset(entries: List[(String, String)]): Unit = withErrorHandling {
|
||||||
entries.foreach {e: (String, String) =>
|
entries.foreach {e: (String, String) =>
|
||||||
db.set(e._1, e._2)
|
db.set(e._1, e._2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def removeMapStorageFor(name: String): Unit = {
|
def removeMapStorageFor(name: String): Unit = withErrorHandling {
|
||||||
db.keys("%s:*".format(encode(name.getBytes))) match {
|
db.keys("%s:*".format(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -113,18 +113,19 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {
|
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = withErrorHandling {
|
||||||
db.delete(makeRedisKey(name, key))
|
db.delete(makeRedisKey(name, key))
|
||||||
}
|
}
|
||||||
|
|
||||||
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] =
|
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = withErrorHandling {
|
||||||
db.get(makeRedisKey(name, key)) match {
|
db.get(makeRedisKey(name, key)) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(new String(key) + " not present")
|
throw new Predef.NoSuchElementException(new String(key) + " not present")
|
||||||
case Some(s) => Some(s.getBytes)
|
case Some(s) => Some(s.getBytes)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def getMapStorageSizeFor(name: String): Int = {
|
def getMapStorageSizeFor(name: String): Int = withErrorHandling {
|
||||||
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
|
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
|
||||||
case None => 0
|
case None => 0
|
||||||
case Some(keys) =>
|
case Some(keys) =>
|
||||||
|
|
@ -132,7 +133,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = {
|
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = withErrorHandling {
|
||||||
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
|
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -143,7 +144,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
|
|
||||||
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]],
|
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]],
|
||||||
finish: Option[Array[Byte]],
|
finish: Option[Array[Byte]],
|
||||||
count: Int): List[(Array[Byte], Array[Byte])] = {
|
count: Int): List[(Array[Byte], Array[Byte])] = withErrorHandling {
|
||||||
|
|
||||||
import scala.collection.immutable.TreeMap
|
import scala.collection.immutable.TreeMap
|
||||||
val wholeSorted =
|
val wholeSorted =
|
||||||
|
|
@ -188,19 +189,19 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) {
|
def insertVectorStorageEntryFor(name: String, element: Array[Byte]): Unit = withErrorHandling {
|
||||||
db.lpush(new String(encode(name.getBytes)), new String(element))
|
db.lpush(new String(encode(name.getBytes)), new String(element))
|
||||||
}
|
}
|
||||||
|
|
||||||
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) {
|
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]): Unit = withErrorHandling {
|
||||||
elements.foreach(insertVectorStorageEntryFor(name, _))
|
elements.foreach(insertVectorStorageEntryFor(name, _))
|
||||||
}
|
}
|
||||||
|
|
||||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) {
|
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]): Unit = withErrorHandling {
|
||||||
db.lset(new String(encode(name.getBytes)), index, new String(elem))
|
db.lset(new String(encode(name.getBytes)), index, new String(elem))
|
||||||
}
|
}
|
||||||
|
|
||||||
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
|
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling {
|
||||||
db.lindex(new String(encode(name.getBytes)), index) match {
|
db.lindex(new String(encode(name.getBytes)), index) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " does not have element at " + index)
|
throw new Predef.NoSuchElementException(name + " does not have element at " + index)
|
||||||
|
|
@ -208,7 +209,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
|
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = withErrorHandling {
|
||||||
/**
|
/**
|
||||||
* <tt>count</tt> is the max number of results to return. Start with
|
* <tt>count</tt> is the max number of results to return. Start with
|
||||||
* <tt>start</tt> or 0 (if <tt>start</tt> is not defined) and go until
|
* <tt>start</tt> or 0 (if <tt>start</tt> is not defined) and go until
|
||||||
|
|
@ -237,11 +238,11 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def insertRefStorageFor(name: String, element: Array[Byte]) {
|
def insertRefStorageFor(name: String, element: Array[Byte]): Unit = withErrorHandling {
|
||||||
db.set(new String(encode(name.getBytes)), new String(element))
|
db.set(new String(encode(name.getBytes)), new String(element))
|
||||||
}
|
}
|
||||||
|
|
||||||
def getRefStorageFor(name: String): Option[Array[Byte]] = {
|
def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling {
|
||||||
db.get(new String(encode(name.getBytes))) match {
|
db.get(new String(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -250,12 +251,13 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
// add to the end of the queue
|
// add to the end of the queue
|
||||||
def enqueue(name: String, item: Array[Byte]): Boolean = {
|
def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling {
|
||||||
db.rpush(new String(encode(name.getBytes)), new String(item))
|
db.rpush(new String(encode(name.getBytes)), new String(item))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// pop from the front of the queue
|
// pop from the front of the queue
|
||||||
def dequeue(name: String): Option[Array[Byte]] = {
|
def dequeue(name: String): Option[Array[Byte]] = withErrorHandling {
|
||||||
db.lpop(new String(encode(name.getBytes))) match {
|
db.lpop(new String(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -265,7 +267,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the size of the queue
|
// get the size of the queue
|
||||||
def size(name: String): Int = {
|
def size(name: String): Int = withErrorHandling {
|
||||||
db.llen(new String(encode(name.getBytes))) match {
|
db.llen(new String(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -275,26 +277,28 @@ private [akka] object RedisStorageBackend extends
|
||||||
|
|
||||||
// return an array of items currently stored in the queue
|
// return an array of items currently stored in the queue
|
||||||
// start is the item to begin, count is how many items to return
|
// start is the item to begin, count is how many items to return
|
||||||
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = count match {
|
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = withErrorHandling {
|
||||||
case 1 =>
|
count match {
|
||||||
db.lindex(new String(encode(name.getBytes)), start) match {
|
case 1 =>
|
||||||
case None =>
|
db.lindex(new String(encode(name.getBytes)), start) match {
|
||||||
throw new Predef.NoSuchElementException("No element at " + start)
|
case None =>
|
||||||
case Some(s) =>
|
throw new Predef.NoSuchElementException("No element at " + start)
|
||||||
List(s.getBytes)
|
case Some(s) =>
|
||||||
}
|
List(s.getBytes)
|
||||||
case n =>
|
}
|
||||||
db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match {
|
case n =>
|
||||||
case None =>
|
db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match {
|
||||||
throw new Predef.NoSuchElementException(
|
case None =>
|
||||||
"No element found between " + start + " and " + (start + count - 1))
|
throw new Predef.NoSuchElementException(
|
||||||
case Some(es) =>
|
"No element found between " + start + " and " + (start + count - 1))
|
||||||
es.map(_.get.getBytes)
|
case Some(es) =>
|
||||||
}
|
es.map(_.get.getBytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// completely delete the queue
|
// completely delete the queue
|
||||||
def remove(name: String): Boolean = {
|
def remove(name: String): Boolean = withErrorHandling {
|
||||||
db.delete(new String(encode(name.getBytes))) match {
|
db.delete(new String(encode(name.getBytes))) match {
|
||||||
case Some(1) => true
|
case Some(1) => true
|
||||||
case _ => false
|
case _ => false
|
||||||
|
|
@ -302,7 +306,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
// add item to sorted set identified by name
|
// add item to sorted set identified by name
|
||||||
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = {
|
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling {
|
||||||
db.zadd(new String(encode(name.getBytes)), zscore, new String(item)) match {
|
db.zadd(new String(encode(name.getBytes)), zscore, new String(item)) match {
|
||||||
case Some(1) => true
|
case Some(1) => true
|
||||||
case _ => false
|
case _ => false
|
||||||
|
|
@ -310,7 +314,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove item from sorted set identified by name
|
// remove item from sorted set identified by name
|
||||||
def zrem(name: String, item: Array[Byte]): Boolean = {
|
def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling {
|
||||||
db.zrem(new String(encode(name.getBytes)), new String(item)) match {
|
db.zrem(new String(encode(name.getBytes)), new String(item)) match {
|
||||||
case Some(1) => true
|
case Some(1) => true
|
||||||
case _ => false
|
case _ => false
|
||||||
|
|
@ -318,7 +322,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
// cardinality of the set identified by name
|
// cardinality of the set identified by name
|
||||||
def zcard(name: String): Int = {
|
def zcard(name: String): Int = withErrorHandling {
|
||||||
db.zcard(new String(encode(name.getBytes))) match {
|
db.zcard(new String(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -326,7 +330,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def zscore(name: String, item: Array[Byte]): String = {
|
def zscore(name: String, item: Array[Byte]): String = withErrorHandling {
|
||||||
db.zscore(new String(encode(name.getBytes)), new String(item)) match {
|
db.zscore(new String(encode(name.getBytes)), new String(item)) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(new String(item) + " not present")
|
throw new Predef.NoSuchElementException(new String(item) + " not present")
|
||||||
|
|
@ -334,7 +338,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = {
|
def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = withErrorHandling {
|
||||||
db.zrange(new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC, false) match {
|
db.zrange(new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC, false) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -343,5 +347,16 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def flushDB = db.flushDb
|
def flushDB = withErrorHandling(db.flushDb)
|
||||||
|
|
||||||
|
private def withErrorHandling[T](body: => T): T = {
|
||||||
|
try {
|
||||||
|
body
|
||||||
|
} catch {
|
||||||
|
case e: java.lang.NullPointerException =>
|
||||||
|
throw new StorageException("Could not connect to Redis server")
|
||||||
|
case e =>
|
||||||
|
throw new StorageException("Error in Redis: " + e.getMessage)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ case object LogSize
|
||||||
class AccountActor extends Transactor {
|
class AccountActor extends Transactor {
|
||||||
private lazy val accountState = RedisStorage.newMap
|
private lazy val accountState = RedisStorage.newMap
|
||||||
private lazy val txnLog = RedisStorage.newVector
|
private lazy val txnLog = RedisStorage.newVector
|
||||||
|
//timeout = 5000
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
// check balance
|
// check balance
|
||||||
|
|
@ -86,6 +87,7 @@ class AccountActor extends Transactor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@serializable class PersistentFailerActor extends Transactor {
|
@serializable class PersistentFailerActor extends Transactor {
|
||||||
|
//timeout = 5000
|
||||||
def receive = {
|
def receive = {
|
||||||
case "Failure" =>
|
case "Failure" =>
|
||||||
throw new RuntimeException("expected")
|
throw new RuntimeException("expected")
|
||||||
|
|
@ -138,7 +140,7 @@ class RedisPersistentActorSpec extends TestCase {
|
||||||
bactor.start
|
bactor.start
|
||||||
bactor !! Credit("a-123", 5000)
|
bactor !! Credit("a-123", 5000)
|
||||||
|
|
||||||
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
|
assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
|
||||||
|
|
||||||
val failer = new PersistentFailerActor
|
val failer = new PersistentFailerActor
|
||||||
failer.start
|
failer.start
|
||||||
|
|
@ -147,7 +149,7 @@ class RedisPersistentActorSpec extends TestCase {
|
||||||
fail("should throw exception")
|
fail("should throw exception")
|
||||||
} catch { case e: RuntimeException => {}}
|
} catch { case e: RuntimeException => {}}
|
||||||
|
|
||||||
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
|
assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
|
||||||
|
|
||||||
// should not count the failed one
|
// should not count the failed one
|
||||||
assertEquals(3, (bactor !! LogSize).get)
|
assertEquals(3, (bactor !! LogSize).get)
|
||||||
|
|
|
||||||
6
akka.iml
6
akka.iml
|
|
@ -15,6 +15,12 @@
|
||||||
</option>
|
</option>
|
||||||
</configuration>
|
</configuration>
|
||||||
</facet>
|
</facet>
|
||||||
|
<facet type="Spring" name="Spring">
|
||||||
|
<configuration />
|
||||||
|
</facet>
|
||||||
|
<facet type="WebBeans" name="Web Beans">
|
||||||
|
<configuration />
|
||||||
|
</facet>
|
||||||
</component>
|
</component>
|
||||||
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5" inherit-compiler-output="false">
|
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5" inherit-compiler-output="false">
|
||||||
<output url="file://$MODULE_DIR$/target/classes" />
|
<output url="file://$MODULE_DIR$/target/classes" />
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,7 @@
|
||||||
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
|
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
|
||||||
|
|
||||||
<cluster>
|
<cluster>
|
||||||
|
service = on # FIXME add 'service = on' for <cluster>
|
||||||
name = "default" # The name of the cluster
|
name = "default" # The name of the cluster
|
||||||
#actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor" # FQN of an implementation of ClusterActor
|
#actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor" # FQN of an implementation of ClusterActor
|
||||||
serializer = "se.scalablesolutions.akka.serialization.Serializer$Java" # FQN of the serializer class
|
serializer = "se.scalablesolutions.akka.serialization.Serializer$Java" # FQN of the serializer class
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue