From dc884020f33ebfa4aba81654e1e6c03068ea370b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Thu, 4 Mar 2010 19:02:23 +0100 Subject: [PATCH] Redis tests now passes with new STM + misc minor changes to Cluster --- akka-core/src/main/scala/actor/Actor.scala | 4 +- .../actor/BootableActorLoaderService.scala | 4 +- .../remote/BootableRemoteActorService.scala | 1 - akka-core/src/main/scala/remote/Cluster.scala | 33 +++--- akka-kernel/src/main/scala/Kernel.scala | 2 +- akka-patterns/src/main/scala/Patterns.scala | 62 ++++++----- .../src/main/scala/Storage.scala | 5 +- .../src/main/scala/RedisStorageBackend.scala | 103 ++++++++++-------- .../test/scala/RedisPersistentActorSpec.scala | 6 +- akka.iml | 6 + config/akka-reference.conf | 1 + 11 files changed, 131 insertions(+), 96 deletions(-) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index df536fb57e..a6b46d903d 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -309,9 +309,9 @@ trait Actor extends TransactionManagement { * If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined. * Can be one of: *
-   *  AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
+   *  faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
    *
-   *  OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
+   *  faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
    * 
*/ protected var faultHandler: Option[FaultHandlingStrategy] = None diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala index 1bacbf6f59..0c84d0965a 100644 --- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala +++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala @@ -31,10 +31,10 @@ trait BootableActorLoaderService extends Bootable with Logging { val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList) new URLClassLoader(toDeploy.toArray, ClassLoader.getSystemClassLoader) - } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") ne null) { + } else if (getClass.getClassLoader.getResourceAsStream("aop.xml") ne null) { getClass.getClassLoader } else throw new IllegalStateException( - "AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting") + "AKKA_HOME is not defined and akka-.jar can not be found on the classpath; aborting...") ) } diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index 429fdb61ec..1b56dcea5a 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -24,7 +24,6 @@ trait BootableRemoteActorService extends Bootable with Logging { abstract override def onLoad = { if(config.getBool("akka.remote.server.service", true)){ - log.info("Starting up Cluster Service") Cluster.start super.onLoad //Initialize BootableActorLoaderService before remote service log.info("Initializing Remote Actors Service...") diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index c2e9069a01..294ce5bd94 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -60,19 +60,18 @@ private[remote] object ClusterActor { abstract class BasicClusterActor extends ClusterActor { import ClusterActor._ - case class Message(sender : ADDR_T,msg : Array[Byte]) + case class Message(sender : ADDR_T, msg : Array[Byte]) case object PapersPlease extends ClusterMessage case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage case object Block extends ClusterMessage case object Unblock extends ClusterMessage - case class View(othersPresent : Set[ADDR_T]) extends ClusterMessage + case class View(othersPresent: Set[ADDR_T]) extends ClusterMessage case class Zombie(address: ADDR_T) extends ClusterMessage case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage type ADDR_T - @volatile private var local: Node = Node(Nil) @volatile private var remotes: Map[ADDR_T, Node] = Map() @@ -206,17 +205,21 @@ abstract class BasicClusterActor extends ClusterActor { * Loads a specified ClusterActor and delegates to that instance. */ object Cluster extends Cluster with Logging { + lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName + @volatile private[remote] var clusterActor: Option[ClusterActor] = None @volatile private[remote] var supervisor: Option[Supervisor] = None + + // FIXME Use the supervisor member field - private[remote] lazy val serializer: Serializer = { - val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName) - Class.forName(className).newInstance.asInstanceOf[Serializer] - } + private[remote] lazy val serializer: Serializer = + Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) + .newInstance.asInstanceOf[Serializer] - private[remote] def createClusterActor : Option[ClusterActor] = { + private[remote] def createClusterActor: Option[ClusterActor] = { val name = config.getString("akka.remote.cluster.actor") - + if (name.isEmpty) throw new IllegalArgumentException( + "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") try { name map { fqn => val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] @@ -225,7 +228,7 @@ object Cluster extends Cluster with Logging { } } catch { - case e => log.error(e,"Couldn't load Cluster provider: [%s]",name.getOrElse("Not specified")); None + case e => log.error(e, "Couldn't load Cluster provider: [%s]", name.getOrElse("Not specified")); None } } @@ -250,10 +253,11 @@ object Cluster extends Cluster with Logging { def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg)) - def foreach(f : (RemoteAddress) => Unit) : Unit = clusterActor.foreach(_.foreach(f)) + def foreach(f: (RemoteAddress) => Unit) : Unit = clusterActor.foreach(_.foreach(f)) - def start : Unit = synchronized { - if(supervisor.isEmpty) { + def start: Unit = synchronized { + log.info("Starting up Cluster Service...") + if (supervisor.isEmpty) { for(actor <- createClusterActor; sup <- createSupervisor(actor)) { clusterActor = Some(actor) @@ -262,7 +266,8 @@ object Cluster extends Cluster with Logging { } } - def shutdown : Unit = synchronized { + def shutdown: Unit = synchronized { + log.info("Shutting down Cluster Service...") supervisor.foreach(_.stop) supervisor = None clusterActor = None diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index f63a50a0a7..7b2b2e4693 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -79,7 +79,7 @@ object Kernel extends Logging { (____ /__|_ \__|_ \(____ / \/ \/ \/ \/ """) - log.info(" Running version %s", Config.VERSION) + log.info(" Running version %s", Config.VERSION) log.info("==============================") } } diff --git a/akka-patterns/src/main/scala/Patterns.scala b/akka-patterns/src/main/scala/Patterns.scala index b967c07df7..9b7e55ccc9 100644 --- a/akka-patterns/src/main/scala/Patterns.scala +++ b/akka-patterns/src/main/scala/Patterns.scala @@ -3,14 +3,14 @@ package se.scalablesolutions.akka.actor.patterns import se.scalablesolutions.akka.actor.Actor object Patterns { - type PF[A,B] = PartialFunction[A,B] + type PF[A, B] = PartialFunction[A, B] /** * Creates a new PartialFunction whose isDefinedAt is a combination * of the two parameters, and whose apply is first to call filter.apply and then filtered.apply */ - def filter[A,B](filter : PF[A,Unit],filtered : PF[A,B]) : PF[A,B] = { - case a : A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) => + def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = { + case a: A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) => filter(a) filtered(a) } @@ -18,39 +18,42 @@ object Patterns { /** * Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true */ - def intercept[A,B](interceptor : (A) => Unit, interceptee : PF[A,B]) : PF[A,B] = filter( - { case a if a.isInstanceOf[A] => interceptor(a) }, - interceptee + def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] = filter( + {case a if a.isInstanceOf[A] => interceptor(a)}, + interceptee ) - + //FIXME 2.8, use default params with CyclicIterator - def loadBalancerActor(actors : => InfiniteIterator[Actor]) : Actor = new Actor with LoadBalancer { + def loadBalancerActor(actors: => InfiniteIterator[Actor]): Actor = new Actor with LoadBalancer { val seq = actors } - def dispatcherActor(routing : PF[Any,Actor], msgTransformer : (Any) => Any) : Actor = new Actor with Dispatcher { - override def transform(msg : Any) = msgTransformer(msg) + def dispatcherActor(routing: PF[Any, Actor], msgTransformer: (Any) => Any): Actor = new Actor with Dispatcher { + override def transform(msg: Any) = msgTransformer(msg) + def routes = routing } - - def dispatcherActor(routing : PF[Any,Actor]) : Actor = new Actor with Dispatcher { - def routes = routing + + def dispatcherActor(routing: PF[Any, Actor]): Actor = new Actor with Dispatcher { + def routes = routing } - def loggerActor(actorToLog : Actor, logger : (Any) => Unit) : Actor = dispatcherActor ( - { case _ => actorToLog }, + def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor = dispatcherActor( + {case _ => actorToLog}, logger - ) + ) } -trait Dispatcher { self : Actor => +trait Dispatcher { + self: Actor => - protected def transform(msg : Any) : Any = msg - protected def routes : PartialFunction[Any,Actor] - - protected def dispatch : PartialFunction[Any,Unit] = { + protected def transform(msg: Any): Any = msg + + protected def routes: PartialFunction[Any, Actor] + + protected def dispatch: PartialFunction[Any, Unit] = { case a if routes.isDefinedAt(a) => { - if(self.sender.isDefined) + if (self.sender.isDefined) routes(a) forward transform(a) else routes(a) send transform(a) @@ -60,19 +63,22 @@ trait Dispatcher { self : Actor => def receive = dispatch } -trait LoadBalancer extends Dispatcher { self : Actor => - protected def seq : InfiniteIterator[Actor] +trait LoadBalancer extends Dispatcher { + self: Actor => + protected def seq: InfiniteIterator[Actor] - protected def routes = { case x if seq.hasNext => seq.next } + protected def routes = {case x if seq.hasNext => seq.next} } trait InfiniteIterator[T] extends Iterator[T] -class CyclicIterator[T](items : List[T]) extends InfiniteIterator[T] { - @volatile private[this] var current : List[T] = items +class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { + @volatile private[this] var current: List[T] = items + def hasNext = items != Nil + def next = { - val nc = if(current == Nil) items else current + val nc = if (current == Nil) items else current current = nc.tail nc.head } diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index b4ea8fc381..f52841b817 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -8,10 +8,11 @@ import se.scalablesolutions.akka.stm.TransactionManagement.transaction import se.scalablesolutions.akka.collection._ import se.scalablesolutions.akka.util.Logging -import org.codehaus.aspectwerkz.proxy.Uuid - +// FIXME move to 'stm' package + add message with more info class NoTransactionInScopeException extends RuntimeException +class StorageException(message: String) extends RuntimeException(message) + /** * Example Scala usage. *

diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index 48945d6b8c..be214087f3 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -72,11 +72,11 @@ private [akka] object RedisStorageBackend extends * base64(T1):base64("debasish.programming_language") -> "scala" * */ - def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) { + def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]): Unit = withErrorHandling { insertMapStorageEntriesFor(name, List((key, value))) } - def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) { + def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]): Unit = withErrorHandling { mset(entries.map(e => (makeRedisKey(name, e._1), new String(e._2)))) } @@ -89,22 +89,22 @@ private [akka] object RedisStorageBackend extends *

  • : is chosen since it cannot appear in base64 encoding charset
  • *
  • both parts of the key need to be based64 encoded since there can be spaces within each of them
  • */ - private [this] def makeRedisKey(name: String, key: Array[Byte]): String = { + private [this] def makeRedisKey(name: String, key: Array[Byte]): String = withErrorHandling { "%s:%s".format(new String(encode(name.getBytes)), new String(encode(key))) } - private [this] def makeKeyFromRedisKey(redisKey: String) = { + private [this] def makeKeyFromRedisKey(redisKey: String) = withErrorHandling { val nk = redisKey.split(':').map{e: String => decode(e.getBytes)} (nk(0), nk(1)) } - private [this] def mset(entries: List[(String, String)]) { + private [this] def mset(entries: List[(String, String)]): Unit = withErrorHandling { entries.foreach {e: (String, String) => db.set(e._1, e._2) } } - def removeMapStorageFor(name: String): Unit = { + def removeMapStorageFor(name: String): Unit = withErrorHandling { db.keys("%s:*".format(encode(name.getBytes))) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -113,18 +113,19 @@ private [akka] object RedisStorageBackend extends } } - def removeMapStorageFor(name: String, key: Array[Byte]): Unit = { + def removeMapStorageFor(name: String, key: Array[Byte]): Unit = withErrorHandling { db.delete(makeRedisKey(name, key)) } - def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = withErrorHandling { db.get(makeRedisKey(name, key)) match { case None => throw new Predef.NoSuchElementException(new String(key) + " not present") case Some(s) => Some(s.getBytes) } + } - def getMapStorageSizeFor(name: String): Int = { + def getMapStorageSizeFor(name: String): Int = withErrorHandling { db.keys("%s:*".format(new String(encode(name.getBytes)))) match { case None => 0 case Some(keys) => @@ -132,7 +133,7 @@ private [akka] object RedisStorageBackend extends } } - def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = { + def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = withErrorHandling { db.keys("%s:*".format(new String(encode(name.getBytes)))) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -143,7 +144,7 @@ private [akka] object RedisStorageBackend extends def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], - count: Int): List[(Array[Byte], Array[Byte])] = { + count: Int): List[(Array[Byte], Array[Byte])] = withErrorHandling { import scala.collection.immutable.TreeMap val wholeSorted = @@ -188,19 +189,19 @@ private [akka] object RedisStorageBackend extends } } - def insertVectorStorageEntryFor(name: String, element: Array[Byte]) { + def insertVectorStorageEntryFor(name: String, element: Array[Byte]): Unit = withErrorHandling { db.lpush(new String(encode(name.getBytes)), new String(element)) } - def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) { + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]): Unit = withErrorHandling { elements.foreach(insertVectorStorageEntryFor(name, _)) } - def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) { + def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]): Unit = withErrorHandling { db.lset(new String(encode(name.getBytes)), index, new String(elem)) } - def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { + def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling { db.lindex(new String(encode(name.getBytes)), index) match { case None => throw new Predef.NoSuchElementException(name + " does not have element at " + index) @@ -208,7 +209,7 @@ private [akka] object RedisStorageBackend extends } } - def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = withErrorHandling { /** * count is the max number of results to return. Start with * start or 0 (if start is not defined) and go until @@ -237,11 +238,11 @@ private [akka] object RedisStorageBackend extends } } - def insertRefStorageFor(name: String, element: Array[Byte]) { + def insertRefStorageFor(name: String, element: Array[Byte]): Unit = withErrorHandling { db.set(new String(encode(name.getBytes)), new String(element)) } - def getRefStorageFor(name: String): Option[Array[Byte]] = { + def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling { db.get(new String(encode(name.getBytes))) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -250,12 +251,13 @@ private [akka] object RedisStorageBackend extends } // add to the end of the queue - def enqueue(name: String, item: Array[Byte]): Boolean = { + def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling { db.rpush(new String(encode(name.getBytes)), new String(item)) } + // pop from the front of the queue - def dequeue(name: String): Option[Array[Byte]] = { + def dequeue(name: String): Option[Array[Byte]] = withErrorHandling { db.lpop(new String(encode(name.getBytes))) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -265,7 +267,7 @@ private [akka] object RedisStorageBackend extends } // get the size of the queue - def size(name: String): Int = { + def size(name: String): Int = withErrorHandling { db.llen(new String(encode(name.getBytes))) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -275,26 +277,28 @@ private [akka] object RedisStorageBackend extends // return an array of items currently stored in the queue // start is the item to begin, count is how many items to return - def peek(name: String, start: Int, count: Int): List[Array[Byte]] = count match { - case 1 => - db.lindex(new String(encode(name.getBytes)), start) match { - case None => - throw new Predef.NoSuchElementException("No element at " + start) - case Some(s) => - List(s.getBytes) - } - case n => - db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match { - case None => - throw new Predef.NoSuchElementException( - "No element found between " + start + " and " + (start + count - 1)) - case Some(es) => - es.map(_.get.getBytes) - } + def peek(name: String, start: Int, count: Int): List[Array[Byte]] = withErrorHandling { + count match { + case 1 => + db.lindex(new String(encode(name.getBytes)), start) match { + case None => + throw new Predef.NoSuchElementException("No element at " + start) + case Some(s) => + List(s.getBytes) + } + case n => + db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match { + case None => + throw new Predef.NoSuchElementException( + "No element found between " + start + " and " + (start + count - 1)) + case Some(es) => + es.map(_.get.getBytes) + } + } } // completely delete the queue - def remove(name: String): Boolean = { + def remove(name: String): Boolean = withErrorHandling { db.delete(new String(encode(name.getBytes))) match { case Some(1) => true case _ => false @@ -302,7 +306,7 @@ private [akka] object RedisStorageBackend extends } // add item to sorted set identified by name - def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = { + def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling { db.zadd(new String(encode(name.getBytes)), zscore, new String(item)) match { case Some(1) => true case _ => false @@ -310,7 +314,7 @@ private [akka] object RedisStorageBackend extends } // remove item from sorted set identified by name - def zrem(name: String, item: Array[Byte]): Boolean = { + def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling { db.zrem(new String(encode(name.getBytes)), new String(item)) match { case Some(1) => true case _ => false @@ -318,7 +322,7 @@ private [akka] object RedisStorageBackend extends } // cardinality of the set identified by name - def zcard(name: String): Int = { + def zcard(name: String): Int = withErrorHandling { db.zcard(new String(encode(name.getBytes))) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -326,7 +330,7 @@ private [akka] object RedisStorageBackend extends } } - def zscore(name: String, item: Array[Byte]): String = { + def zscore(name: String, item: Array[Byte]): String = withErrorHandling { db.zscore(new String(encode(name.getBytes)), new String(item)) match { case None => throw new Predef.NoSuchElementException(new String(item) + " not present") @@ -334,7 +338,7 @@ private [akka] object RedisStorageBackend extends } } - def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = { + def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = withErrorHandling { db.zrange(new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC, false) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -343,5 +347,16 @@ private [akka] object RedisStorageBackend extends } } - def flushDB = db.flushDb + def flushDB = withErrorHandling(db.flushDb) + + private def withErrorHandling[T](body: => T): T = { + try { + body + } catch { + case e: java.lang.NullPointerException => + throw new StorageException("Could not connect to Redis server") + case e => + throw new StorageException("Error in Redis: " + e.getMessage) + } + } } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala index 86d4384b70..8c91f0ff61 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala @@ -29,6 +29,7 @@ case object LogSize class AccountActor extends Transactor { private lazy val accountState = RedisStorage.newMap private lazy val txnLog = RedisStorage.newVector + //timeout = 5000 def receive = { // check balance @@ -86,6 +87,7 @@ class AccountActor extends Transactor { } @serializable class PersistentFailerActor extends Transactor { + //timeout = 5000 def receive = { case "Failure" => throw new RuntimeException("expected") @@ -138,7 +140,7 @@ class RedisPersistentActorSpec extends TestCase { bactor.start bactor !! Credit("a-123", 5000) - assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get) + assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get) val failer = new PersistentFailerActor failer.start @@ -147,7 +149,7 @@ class RedisPersistentActorSpec extends TestCase { fail("should throw exception") } catch { case e: RuntimeException => {}} - assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get) + assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get) // should not count the failed one assertEquals(3, (bactor !! LogSize).get) diff --git a/akka.iml b/akka.iml index a39c87020f..74542e8e48 100644 --- a/akka.iml +++ b/akka.iml @@ -15,6 +15,12 @@ + + + + + + diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 296b06428b..5a1c33497b 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -49,6 +49,7 @@ zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 + service = on # FIXME add 'service = on' for name = "default" # The name of the cluster #actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor" # FQN of an implementation of ClusterActor serializer = "se.scalablesolutions.akka.serialization.Serializer$Java" # FQN of the serializer class