org.scala-tools
javautils
diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala
index 1858952f40..d88f0e861b 100644
--- a/akka-core/src/main/scala/actor/ActiveObject.scala
+++ b/akka-core/src/main/scala/actor/ActiveObject.scala
@@ -6,7 +6,7 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
-import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult}
+import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util._
@@ -299,7 +299,7 @@ private[akka] sealed class ActiveObjectAspect {
}
}
- private def getResultOrThrowException[T](future: FutureResult): Option[T] =
+ private def getResultOrThrowException[T](future: Future): Option[T] =
if (future.exception.isDefined) {
val (_, cause) = future.exception.get
throw cause
diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala
index a1eb6bd309..60f3967e6c 100644
--- a/akka-core/src/main/scala/actor/Actor.scala
+++ b/akka-core/src/main/scala/actor/Actor.scala
@@ -10,13 +10,14 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.Transaction._
import se.scalablesolutions.akka.stm.TransactionManagement._
-import se.scalablesolutions.akka.stm.{StmException, TransactionManagement}
+import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID}
import org.multiverse.api.ThreadLocalTransaction._
+import org.multiverse.commitbarriers.CountDownCommitBarrier
import java.util.{Queue, HashSet}
import java.util.concurrent.ConcurrentLinkedQueue
@@ -72,7 +73,7 @@ object Actor extends Logging {
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
- object Sender{
+ object Sender {
implicit val Self: Option[Actor] = None
}
@@ -98,9 +99,7 @@ object Actor extends Logging {
* The actor is started when created.
* Example:
*
- * import Actor._
- *
- * val a = actor {
+ * val a = Actor.init {
* ... // init stuff
* } receive {
* case msg => ... // handle message
@@ -108,8 +107,8 @@ object Actor extends Logging {
*
*
*/
- def actor(body: => Unit) = {
- def handler(body: => Unit) = new {
+ def init[A](body: => Unit) = {
+ def handler[A](body: => Unit) = new {
def receive(handler: PartialFunction[Any, Unit]) = new Actor() {
start
body
@@ -198,7 +197,7 @@ object Actor extends Logging {
*/
trait Actor extends TransactionManagement {
implicit protected val self: Option[Actor] = Some(this)
- implicit protected val transactionFamily: String = this.getClass.getName
+ implicit protected val transactionFamilyName: String = this.getClass.getName
// Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = UUID.newUuid.toString
@@ -219,6 +218,7 @@ trait Actor extends TransactionManagement {
private[akka] var _replyToAddress: Option[InetSocketAddress] = None
private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation]
+
// ====================================
// protected fields
// ====================================
@@ -240,7 +240,7 @@ trait Actor extends TransactionManagement {
* But it can be used for advanced use-cases when one might want to store away the future and
* resolve it later and/or somewhere else.
*/
- protected var senderFuture: Option[CompletableFutureResult] = None
+ protected var senderFuture: Option[CompletableFuture] = None
// ====================================
// ==== USER CALLBACKS TO OVERRIDE ====
@@ -309,9 +309,9 @@ trait Actor extends TransactionManagement {
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
* Can be one of:
*
- * AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
+ * faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
*
- * OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
+ * faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
*
*/
protected var faultHandler: Option[FaultHandlingStrategy] = None
@@ -334,8 +334,8 @@ trait Actor extends TransactionManagement {
/**
* User overridable callback/setting.
*
- * Partial function implementing the server logic.
- * To be implemented by subclassing server.
+ * Partial function implementing the actor logic.
+ * To be implemented by subclassing actor.
*
* Example code:
*
@@ -533,7 +533,10 @@ trait Actor extends TransactionManagement {
*/
def !: Option[T] = !
- def !!!(message: Any): FutureResult = {
+ /**
+ * FIXME document !!!
+ */
+ def !!!(message: Any): Future = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) {
postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None)
@@ -541,12 +544,6 @@ trait Actor extends TransactionManagement {
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
- /**
- * This method is evil and has been removed. Use '!!' with a timeout instead.
- */
- def !?[T](message: Any): T = throw new UnsupportedOperationException(
- "'!?' is evil and has been removed. Use '!!' with a timeout instead")
-
/**
* Forwards the message and passes the original sender actor as the sender.
*
@@ -656,9 +653,9 @@ trait Actor extends TransactionManagement {
* To be invoked from within the actor itself.
*/
protected[this] def link(actor: Actor) = {
- getLinkedActors.add(actor)
if (actor._supervisor.isDefined) throw new IllegalStateException(
"Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
+ getLinkedActors.add(actor)
actor._supervisor = Some(this)
Actor.log.debug("Linking actor [%s] to actor [%s]", actor, this)
}
@@ -788,6 +785,11 @@ trait Actor extends TransactionManagement {
}
protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
+ if (isTransactionSetInScope) {
+ log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message)
+ getTransactionSetInScope.incParties
+ }
+
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@@ -799,8 +801,7 @@ trait Actor extends TransactionManagement {
.setIsEscaped(false)
val id = registerSupervisorAsRemoteActor
- if(id.isDefined)
- requestBuilder.setSupervisorUuid(id.get)
+ if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
// set the source fields used to reply back to the original sender
// (i.e. not the remote proxy actor)
@@ -819,7 +820,7 @@ trait Actor extends TransactionManagement {
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)
} else {
- val invocation = new MessageInvocation(this, message, None, sender, currentTransaction.get)
+ val invocation = new MessageInvocation(this, message, None, sender, transactionSet.get)
if (_isEventBased) {
_mailbox.add(invocation)
if (_isSuspended) invocation.send
@@ -827,12 +828,18 @@ trait Actor extends TransactionManagement {
else
invocation.send
}
+ clearTransactionSet
}
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
- senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
+ senderFuture: Option[CompletableFuture]): CompletableFuture = {
+ if (isTransactionSetInScope) {
+ log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message)
+ getTransactionSetInScope.incParties
+ }
+
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@@ -846,16 +853,18 @@ trait Actor extends TransactionManagement {
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture)
+ clearTransactionSet
if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else {
val future = if (senderFuture.isDefined) senderFuture.get
- else new DefaultCompletableFutureResult(timeout)
- val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
+ else new DefaultCompletableFuture(timeout)
+ val invocation = new MessageInvocation(this, message, Some(future), None, transactionSet.get)
if (_isEventBased) {
_mailbox.add(invocation)
invocation.send
} else invocation.send
+ clearTransactionSet
future
}
}
@@ -864,6 +873,7 @@ trait Actor extends TransactionManagement {
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
*/
private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
+ //log.trace("%s is invoked with message %s", toString, messageHandle)
try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
@@ -875,7 +885,7 @@ trait Actor extends TransactionManagement {
}
private def dispatch[T](messageHandle: MessageInvocation) = {
- setTransaction(messageHandle.tx)
+ setTransactionSet(messageHandle.transactionSet)
val message = messageHandle.message //serializeMessage(messageHandle.message)
senderFuture = messageHandle.future
@@ -897,47 +907,59 @@ trait Actor extends TransactionManagement {
}
private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
- setTransaction(messageHandle.tx)
+ var topLevelTransaction = false
+ val txSet: Option[CountDownCommitBarrier] =
+ if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
+ else {
+ topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
+ if (isTransactionRequiresNew) {
+ log.trace("Creating a new transaction set (top-level transaction) \nfor actor %s \nwith message %s", toString, messageHandle)
+ Some(createNewTransactionSet)
+ } else None
+ }
+ setTransactionSet(txSet)
val message = messageHandle.message //serializeMessage(messageHandle.message)
senderFuture = messageHandle.future
sender = messageHandle.sender
+ def clearTx = {
+ clearTransactionSet
+ clearTransaction
+ }
+
def proceed = {
- try {
- incrementTransaction
- if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
- else throw new IllegalArgumentException(
- "Actor " + toString + " could not process message [" + message + "]" +
- "\n\tsince no matching 'case' clause in its 'receive' method could be found")
- } finally {
- decrementTransaction
- }
+ if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
+ else throw new IllegalArgumentException(
+ toString + " could not process message [" + message + "]" +
+ "\n\tsince no matching 'case' clause in its 'receive' method could be found")
+ setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
try {
- if (isTransactionRequiresNew && !isTransactionInScope) {
- if (senderFuture.isEmpty) throw new StmException(
- "Can't continue transaction in a one-way fire-forget message send" +
- "\n\tE.g. using Actor '!' method or Active Object 'void' method" +
- "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
+ if (isTransactionRequiresNew) {
atomic {
proceed
}
} else proceed
} catch {
+ case e: IllegalStateException => {}
case e =>
+ // abort transaction set
+ if (isTransactionSetInScope) try { getTransactionSetInScope.abort } catch { case e: IllegalStateException => {} }
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
+
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
- clearTransaction // need to clear currentTransaction before call to supervisor
+ clearTx // need to clear currentTransaction before call to supervisor
+
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
} finally {
- clearTransaction
+ clearTx
}
}
- private def getResultOrThrowException[T](future: FutureResult): Option[T] =
+ private def getResultOrThrowException[T](future: Future): Option[T] =
if (future.exception.isDefined) throw future.exception.get._2
else future.result.asInstanceOf[Option[T]]
@@ -1045,6 +1067,5 @@ trait Actor extends TransactionManagement {
that.asInstanceOf[Actor]._uuid == _uuid
}
- override def toString(): String = "Actor[" + id + ":" + uuid + "]"
-
+ override def toString = "Actor[" + id + ":" + uuid + "]"
}
diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
index 1bacbf6f59..0c84d0965a 100644
--- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
+++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
@@ -31,10 +31,10 @@ trait BootableActorLoaderService extends Bootable with Logging {
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
new URLClassLoader(toDeploy.toArray, ClassLoader.getSystemClassLoader)
- } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") ne null) {
+ } else if (getClass.getClassLoader.getResourceAsStream("aop.xml") ne null) {
getClass.getClassLoader
} else throw new IllegalStateException(
- "AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
+ "AKKA_HOME is not defined and akka-.jar can not be found on the classpath; aborting...")
)
}
diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
index 7da13a10b3..e115800d4b 100644
--- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
@@ -63,10 +63,10 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
executor.execute(new Runnable() {
def run = {
invocation.receiver.synchronized {
- val messages = invocation.receiver._mailbox.iterator
- while (messages.hasNext) {
- messages.next.asInstanceOf[MessageInvocation].invoke
- messages.remove
+ var messageInvocation = invocation.receiver._mailbox.poll
+ while (messageInvocation != null) {
+ messageInvocation.invoke
+ messageInvocation = invocation.receiver._mailbox.poll
}
}
}
diff --git a/akka-core/src/main/scala/dispatch/Future.scala b/akka-core/src/main/scala/dispatch/Future.scala
index c1e61695b8..0bf9723e31 100644
--- a/akka-core/src/main/scala/dispatch/Future.scala
+++ b/akka-core/src/main/scala/dispatch/Future.scala
@@ -2,22 +2,38 @@
* Copyright (C) 2009-2010 Scalable Solutions AB
*/
-/**
- * Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
- */
package se.scalablesolutions.akka.dispatch
import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.{SynchronousQueue, TimeUnit}
+import java.util.concurrent.TimeUnit
class FutureTimeoutException(message: String) extends RuntimeException(message)
object Futures {
- def awaitAll(futures: List[FutureResult]): Unit = futures.foreach(_.await)
- def awaitOne(futures: List[FutureResult]): FutureResult = {
- var future: Option[FutureResult] = None
+ /**
+ * FIXME document
+ *
+ * val future = Futures.future(1000) {
+ * ... // do stuff
+ * }
+ *
+ */
+ def future(timeout: Long)(body: => Any): Future = {
+ val promise = new DefaultCompletableFuture(timeout)
+ try {
+ promise completeWithResult body
+ } catch {
+ case e => promise completeWithException (None, e)
+ }
+ promise
+ }
+
+ def awaitAll(futures: List[Future]): Unit = futures.foreach(_.await)
+
+ def awaitOne(futures: List[Future]): Future = {
+ var future: Option[Future] = None
do {
future = futures.find(_.isCompleted)
} while (future.isEmpty)
@@ -25,7 +41,7 @@ object Futures {
}
/*
- def awaitEither(f1: FutureResult, f2: FutureResult): Option[Any] = {
+ def awaitEither(f1: Future, f2: Future): Option[Any] = {
import Actor.Sender.Self
import Actor.{spawn, actor}
@@ -54,7 +70,7 @@ object Futures {
*/
}
-sealed trait FutureResult {
+sealed trait Future {
def await
def awaitBlocking
def isCompleted: Boolean
@@ -64,12 +80,13 @@ sealed trait FutureResult {
def exception: Option[Tuple2[AnyRef, Throwable]]
}
-trait CompletableFutureResult extends FutureResult {
+trait CompletableFuture extends Future {
def completeWithResult(result: Any)
def completeWithException(toBlame: AnyRef, exception: Throwable)
}
-class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureResult {
+// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
+class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
private val TIME_UNIT = TimeUnit.MILLISECONDS
def this() = this(0)
diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala
index f7bfa52215..627d27aeac 100644
--- a/akka-core/src/main/scala/dispatch/Reactor.scala
+++ b/akka-core/src/main/scala/dispatch/Reactor.scala
@@ -7,16 +7,17 @@ package se.scalablesolutions.akka.dispatch
import java.util.List
import se.scalablesolutions.akka.util.{HashCode, Logging}
-import se.scalablesolutions.akka.stm.Transaction
import se.scalablesolutions.akka.actor.Actor
import java.util.concurrent.ConcurrentHashMap
+import org.multiverse.commitbarriers.CountDownCommitBarrier
+
final class MessageInvocation(val receiver: Actor,
val message: Any,
- val future: Option[CompletableFutureResult],
+ val future: Option[CompletableFuture],
val sender: Option[Actor],
- val tx: Option[Transaction]) {
+ val transactionSet: Option[CountDownCommitBarrier]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null")
def invoke = receiver.invoke(this)
@@ -37,13 +38,13 @@ final class MessageInvocation(val receiver: Actor,
that.asInstanceOf[MessageInvocation].message == message
}
- override def toString(): String = synchronized {
+ override def toString = synchronized {
"MessageInvocation[" +
"\n\tmessage = " + message +
"\n\treceiver = " + receiver +
"\n\tsender = " + sender +
"\n\tfuture = " + future +
- "\n\ttx = " + tx +
+ "\n\ttransactionSet = " + transactionSet +
"\n]"
}
}
diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala
index cb465907cb..1fedc1a5d7 100644
--- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala
+++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala
@@ -4,11 +4,11 @@
package se.scalablesolutions.akka.dispatch
+import java.util.Collection
import java.util.concurrent._
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
-import java.util.Collection
import se.scalablesolutions.akka.util.Logging
trait ThreadPoolBuilder {
diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala
index 429fdb61ec..6c3183ef8c 100644
--- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala
+++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala
@@ -24,7 +24,6 @@ trait BootableRemoteActorService extends Bootable with Logging {
abstract override def onLoad = {
if(config.getBool("akka.remote.server.service", true)){
- log.info("Starting up Cluster Service")
Cluster.start
super.onLoad //Initialize BootableActorLoaderService before remote service
log.info("Initializing Remote Actors Service...")
@@ -35,14 +34,21 @@ trait BootableRemoteActorService extends Bootable with Logging {
super.onLoad
}
-
+
abstract override def onUnload = {
- super.onUnload
- if (remoteServerThread.isAlive) {
- log.info("Shutting down Remote Actors Service")
- RemoteNode.shutdown
- remoteServerThread.join(1000)
- }
+ super.onUnload
+
+ log.info("Shutting down Remote Actors Service")
+
+ RemoteNode.shutdown
+
+ if (remoteServerThread.isAlive)
+ remoteServerThread.join(1000)
+
+ log.info("Shutting down Cluster")
Cluster.shutdown
+
+ log.info("Remote Actors Service has been shut down")
}
+
}
\ No newline at end of file
diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala
index c2e9069a01..fb14b6b357 100644
--- a/akka-core/src/main/scala/remote/Cluster.scala
+++ b/akka-core/src/main/scala/remote/Cluster.scala
@@ -26,8 +26,8 @@ trait Cluster {
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit
def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T]
-
- def foreach(f : (RemoteAddress) => Unit) : Unit
+
+ def foreach(f: (RemoteAddress) => Unit): Unit
}
/**
@@ -48,7 +48,15 @@ private[remote] object ClusterActor {
sealed trait ClusterMessage
private[remote] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage
-
+ private[remote] case class Message[ADDR_T](sender: ADDR_T, msg: Array[Byte])
+ private[remote] case object PapersPlease extends ClusterMessage
+ private[remote] case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage
+ private[remote] case object Block extends ClusterMessage
+ private[remote] case object Unblock extends ClusterMessage
+ private[remote] case class View[ADDR_T](othersPresent: Set[ADDR_T]) extends ClusterMessage
+ private[remote] case class Zombie[ADDR_T](address: ADDR_T) extends ClusterMessage
+ private[remote] case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage
+ private[remote] case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage
private[remote] case class Node(endpoints: List[RemoteAddress])
}
@@ -59,76 +67,65 @@ private[remote] object ClusterActor {
*/
abstract class BasicClusterActor extends ClusterActor {
import ClusterActor._
-
- case class Message(sender : ADDR_T,msg : Array[Byte])
- case object PapersPlease extends ClusterMessage
- case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage
- case object Block extends ClusterMessage
- case object Unblock extends ClusterMessage
- case class View(othersPresent : Set[ADDR_T]) extends ClusterMessage
- case class Zombie(address: ADDR_T) extends ClusterMessage
- case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage
- case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage
-
type ADDR_T
-
@volatile private var local: Node = Node(Nil)
@volatile private var remotes: Map[ADDR_T, Node] = Map()
override def init = {
- remotes = new HashMap[ADDR_T, Node]
+ remotes = new HashMap[ADDR_T, Node]
}
override def shutdown = {
- remotes = Map()
+ remotes = Map()
}
def receive = {
- case v @ View(members) => {
+ case v: View[ADDR_T] => {
// Not present in the cluster anymore = presumably zombies
// Nodes we have no prior knowledge existed = unknowns
- val zombies = Set[ADDR_T]() ++ remotes.keySet -- members
- val unknown = members -- remotes.keySet
+ val zombies = Set[ADDR_T]() ++ remotes.keySet -- v.othersPresent
+ val unknown = v.othersPresent -- remotes.keySet
log debug ("Updating view")
- log debug ("Other memebers: [%s]",members)
- log debug ("Zombies: [%s]",zombies)
- log debug ("Unknowns: [%s]",unknown)
+ log debug ("Other memebers: [%s]", v.othersPresent)
+ log debug ("Zombies: [%s]", zombies)
+ log debug ("Unknowns: [%s]", unknown)
// Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead
broadcast(zombies ++ unknown, PapersPlease)
remotes = remotes -- zombies
}
- case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead
- log debug ("Killing Zombie Node: %s", x)
- broadcast(x :: Nil, PapersPlease)
- remotes = remotes - x
+ case z: Zombie[ADDR_T] => { //Ask the presumed zombie for papers and prematurely treat it as dead
+ log debug ("Killing Zombie Node: %s", z.address)
+ broadcast(z.address :: Nil, PapersPlease)
+ remotes = remotes - z.address
}
- case rm @ RelayedMessage(_, _) => {
+ case rm@RelayedMessage(_, _) => {
log debug ("Relaying message: %s", rm)
broadcast(rm)
}
- case m @ Message(src,msg) => {
- (Cluster.serializer in (msg, None)) match {
+ case m: Message[ADDR_T] => {
+ val (src, msg) = (m.sender, m.msg)
+ (Cluster.serializer in (msg, None)) match {
- case PapersPlease => {
- log debug ("Asked for papers by %s", src)
- broadcast(src :: Nil, Papers(local.endpoints))
+ case PapersPlease => {
+ log debug ("Asked for papers by %s", src)
+ broadcast(src :: Nil, Papers(local.endpoints))
- if (remotes.get(src).isEmpty) // If we were asked for papers from someone we don't know, ask them!
- broadcast(src :: Nil, PapersPlease)
- }
-
- case Papers(x) => remotes = remotes + (src -> Node(x))
-
- case RelayedMessage(c, m) => ActorRegistry.actorsFor(c).foreach(_ send m)
-
- case unknown => log debug ("Unknown message: %s", unknown.toString)
+ if (remotes.get(src).isEmpty) // If we were asked for papers from someone we don't know, ask them!
+ broadcast(src :: Nil, PapersPlease)
}
+
+ case Papers(x) => remotes = remotes + (src -> Node(x))
+
+ case RelayedMessage(c, m) => ActorRegistry.actorsFor(c).foreach(_ send m)
+
+ case unknown => log debug ("Unknown message: %s", unknown.toString)
+ }
}
case RegisterLocalNode(s) => {
@@ -147,12 +144,12 @@ abstract class BasicClusterActor extends ClusterActor {
/**
* Implement this in a subclass to add node-to-node messaging
*/
- protected def toOneNode(dest : ADDR_T, msg : Array[Byte]) : Unit
+ protected def toOneNode(dest: ADDR_T, msg: Array[Byte]): Unit
/**
* Implement this in a subclass to add node-to-many-nodes messaging
*/
- protected def toAllNodes(msg : Array[Byte]) : Unit
+ protected def toAllNodes(msg: Array[Byte]): Unit
/**
* Sends the specified message to the given recipients using the serializer
@@ -160,7 +157,7 @@ abstract class BasicClusterActor extends ClusterActor {
*/
protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = {
lazy val m = Cluster.serializer out msg
- for (r <- recipients) toOneNode(r,m)
+ for (r <- recipients) toOneNode(r, m)
}
/**
@@ -175,11 +172,11 @@ abstract class BasicClusterActor extends ClusterActor {
*/
def lookup[T](handleRemoteAddress: PartialFunction[RemoteAddress, T]): Option[T] =
remotes.values.toList.flatMap(_.endpoints).find(handleRemoteAddress isDefinedAt _).map(handleRemoteAddress)
-
+
/**
* Applies the given function to all remote addresses known
*/
- def foreach(f : (RemoteAddress) => Unit) : Unit = remotes.values.toList.flatMap(_.endpoints).foreach(f)
+ def foreach(f: (RemoteAddress) => Unit): Unit = remotes.values.toList.flatMap(_.endpoints).foreach(f)
/**
* Registers a local endpoint
@@ -206,36 +203,38 @@ abstract class BasicClusterActor extends ClusterActor {
* Loads a specified ClusterActor and delegates to that instance.
*/
object Cluster extends Cluster with Logging {
+ lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName
+
@volatile private[remote] var clusterActor: Option[ClusterActor] = None
+
+ // FIXME Use the supervisor member field
@volatile private[remote] var supervisor: Option[Supervisor] = None
-
- private[remote] lazy val serializer: Serializer = {
- val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName)
- Class.forName(className).newInstance.asInstanceOf[Serializer]
- }
- private[remote] def createClusterActor : Option[ClusterActor] = {
+ private[remote] lazy val serializer: Serializer =
+ Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
+ .newInstance.asInstanceOf[Serializer]
+
+ private[remote] def createClusterActor: Option[ClusterActor] = {
val name = config.getString("akka.remote.cluster.actor")
-
+ if (name.isEmpty) throw new IllegalArgumentException(
+ "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
try {
- name map { fqn =>
- val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
- a.start
- a
+ name map {
+ fqn =>
+ Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
}
}
catch {
- case e => log.error(e,"Couldn't load Cluster provider: [%s]",name.getOrElse("Not specified")); None
+ case e => log.error(e, "Couldn't load Cluster provider: [%s]", name.getOrElse("Not specified")); None
}
}
- private[remote] def createSupervisor(actor : ClusterActor) : Option[Supervisor] = {
+ private[remote] def createSupervisor(actor: ClusterActor): Option[Supervisor] = {
val sup = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
Supervise(actor, LifeCycle(Permanent)) :: Nil)
).newInstance
- sup.start
Some(sup)
}
@@ -249,20 +248,23 @@ object Cluster extends Cluster with Logging {
def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.deregisterLocalNode(hostname, port))
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg))
-
- def foreach(f : (RemoteAddress) => Unit) : Unit = clusterActor.foreach(_.foreach(f))
- def start : Unit = synchronized {
- if(supervisor.isEmpty) {
- for(actor <- createClusterActor;
- sup <- createSupervisor(actor)) {
- clusterActor = Some(actor)
- supervisor = Some(sup)
+ def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f))
+
+ def start: Unit = synchronized {
+ log.info("Starting up Cluster Service...")
+ if (supervisor.isEmpty) {
+ for (actor <- createClusterActor;
+ sup <- createSupervisor(actor)) {
+ clusterActor = Some(actor)
+ supervisor = Some(sup)
+ sup.start
}
}
}
- def shutdown : Unit = synchronized {
+ def shutdown: Unit = synchronized {
+ log.info("Shutting down Cluster Service...")
supervisor.foreach(_.stop)
supervisor = None
clusterActor = None
diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala
index f97f014f06..0887ebcd82 100644
--- a/akka-core/src/main/scala/remote/RemoteClient.scala
+++ b/akka-core/src/main/scala/remote/RemoteClient.scala
@@ -6,7 +6,7 @@ package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
import se.scalablesolutions.akka.actor.{Exit, Actor}
-import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult}
+import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import se.scalablesolutions.akka.util.{UUID, Logging}
import se.scalablesolutions.akka.Config.config
@@ -86,7 +86,7 @@ object RemoteClient extends Logging {
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
- senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
+ senderFuture: Option[CompletableFuture]): CompletableFuture = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
@@ -168,7 +168,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port
@volatile private[remote] var isRunning = false
- private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
+ private val futures = new ConcurrentHashMap[Long, CompletableFuture]
private val supervisors = new ConcurrentHashMap[String, Actor]
private val channelFactory = new NioClientSocketChannelFactory(
@@ -208,14 +208,14 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
}
}
- def send(request: RemoteRequest, senderFuture: Option[CompletableFutureResult]): Option[CompletableFutureResult] = if (isRunning) {
+ def send(request: RemoteRequest, senderFuture: Option[CompletableFuture]): Option[CompletableFuture] = if (isRunning) {
if (request.getIsOneWay) {
connection.getChannel.write(request)
None
} else {
futures.synchronized {
val futureResult = if (senderFuture.isDefined) senderFuture.get
- else new DefaultCompletableFutureResult(request.getTimeout)
+ else new DefaultCompletableFuture(request.getTimeout)
futures.put(request.getId, futureResult)
connection.getChannel.write(request)
Some(futureResult)
@@ -238,7 +238,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
* @author Jonas Bonér
*/
class RemoteClientPipelineFactory(name: String,
- futures: ConcurrentMap[Long, CompletableFutureResult],
+ futures: ConcurrentMap[Long, CompletableFuture],
supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap,
remoteAddress: SocketAddress,
@@ -269,7 +269,7 @@ class RemoteClientPipelineFactory(name: String,
*/
@ChannelPipelineCoverage {val value = "all"}
class RemoteClientHandler(val name: String,
- val futures: ConcurrentMap[Long, CompletableFutureResult],
+ val futures: ConcurrentMap[Long, CompletableFuture],
val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress,
diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala
index 6da2ceea99..02cf98bcd2 100644
--- a/akka-core/src/main/scala/remote/RemoteServer.scala
+++ b/akka-core/src/main/scala/remote/RemoteServer.scala
@@ -58,7 +58,7 @@ object RemoteNode extends RemoteServer
*/
object RemoteServer {
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
- val PORT = config.getInt("akka.remote.server.port", 9999)
+ val PORT = config.getInt("akka.remote.server.port", 9966)
val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000)
diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala
index aa5a8255e4..daed4ec55f 100644
--- a/akka-core/src/main/scala/stm/DataFlowVariable.scala
+++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala
@@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.actor.Actor
-import se.scalablesolutions.akka.dispatch.CompletableFutureResult
+import se.scalablesolutions.akka.dispatch.CompletableFuture
/**
* Implements Oz-style dataflow (single assignment) variables.
@@ -74,7 +74,7 @@ object DataFlow {
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT
start
- private var readerFuture: Option[CompletableFutureResult] = None
+ private var readerFuture: Option[CompletableFuture] = None
def receive = {
case Get =>
val ref = dataFlow.value.get
diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala
index 1637b4c906..133c292a6f 100644
--- a/akka-core/src/main/scala/stm/Transaction.scala
+++ b/akka-core/src/main/scala/stm/Transaction.scala
@@ -7,15 +7,18 @@ package se.scalablesolutions.akka.stm
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.mutable.HashMap
+
import se.scalablesolutions.akka.state.Committable
import se.scalablesolutions.akka.util.Logging
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
import org.multiverse.api.ThreadLocalTransaction._
-import org.multiverse.templates.OrElseTemplate
-
-import scala.collection.mutable.HashMap
+import org.multiverse.templates.{TransactionTemplate, OrElseTemplate}
+import org.multiverse.utils.backoff.ExponentialBackoffPolicy
+import org.multiverse.stms.alpha.AlphaStm
+import java.util.concurrent.TimeUnit
class NoTransactionInScopeException extends RuntimeException
class TransactionRetryException(message: String) extends RuntimeException(message)
@@ -30,8 +33,8 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
* Here are some examples (assuming implicit transaction family name in scope):
*
* import se.scalablesolutions.akka.stm.Transaction._
- *
- * atomic {
+ *
+ * atomic {
* .. // do something within a transaction
* }
*
@@ -39,8 +42,8 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
* Example of atomic transaction management using atomic block with retry count:
*
* import se.scalablesolutions.akka.stm.Transaction._
- *
- * atomic(maxNrOfRetries) {
+ *
+ * atomic(maxNrOfRetries) {
* .. // do something within a transaction
* }
*
@@ -49,10 +52,10 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
* Which is a good way to reduce contention and transaction collisions.
*
* import se.scalablesolutions.akka.stm.Transaction._
- *
- * atomically {
+ *
+ * atomically {
* .. // try to do something
- * } orElse {
+ * } orElse {
* .. // if transaction clashes try do do something else to minimize contention
* }
*
@@ -61,11 +64,11 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
*
*
* import se.scalablesolutions.akka.stm.Transaction._
- * for (tx <- Transaction) {
+ * for (tx <- Transaction) {
* ... // do transactional stuff
* }
*
- * val result = for (tx <- Transaction) yield {
+ * val result = for (tx <- Transaction) yield {
* ... // do transactional stuff yielding a result
* }
*
@@ -78,17 +81,17 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
*
* // You can use them together with Transaction in a for comprehension since
* // TransactionalRef is also monadic
- * for {
+ * for {
* tx <- Transaction
* ref <- refs
* } {
* ... // use the ref inside a transaction
* }
*
- * val result = for {
+ * val result = for {
* tx <- Transaction
* ref <- refs
- * } yield {
+ * } yield {
* ... // use the ref inside a transaction, yield a result
* }
*
@@ -97,101 +100,87 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
*/
object Transaction extends TransactionManagement {
val idFactory = new AtomicLong(-1L)
+/*
+ import AlphaStm._
+ private val defaultTxBuilder = new AlphaTransactionFactoryBuilder
+ defaultTxBuilder.setReadonly(false)
+ defaultTxBuilder.setInterruptible(INTERRUPTIBLE)
+ defaultTxBuilder.setMaxRetryCount(MAX_NR_OF_RETRIES)
+ defaultTxBuilder.setPreventWriteSkew(PREVENT_WRITE_SKEW)
+ defaultTxBuilder.setAutomaticReadTracking(AUTOMATIC_READ_TRACKING)
+ defaultTxBuilder.setSmartTxLengthSelector(SMART_TX_LENGTH_SELECTOR)
+ defaultTxBuilder.setBackoffPolicy(new ExponentialBackoffPolicy)
+ private val readOnlyTxBuilder = new AlphaStm.AlphaTransactionFactoryBuilder
+ readOnlyTxBuilder.setReadonly(true)
+ readOnlyTxBuilder.setInterruptible(INTERRUPTIBLE)
+ readOnlyTxBuilder.setMaxRetryCount(MAX_NR_OF_RETRIES)
+ readOnlyTxBuilder.setPreventWriteSkew(PREVENT_WRITE_SKEW)
+ readOnlyTxBuilder.setAutomaticReadTracking(AUTOMATIC_READ_TRACKING)
+ readOnlyTxBuilder.setSmartTxLengthSelector(SMART_TX_LENGTH_SELECTOR)
+ readOnlyTxBuilder.setBackoffPolicy(new ExponentialBackoffPolicy)
+ */
+ /**
+ * See ScalaDoc on class.
+ */
+ def map[T](f: => T)(implicit transactionFamilyName: String): T =
+ atomic {f}
/**
* See ScalaDoc on class.
*/
- def map[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) }
+ def flatMap[T](f: => T)(implicit transactionFamilyName: String): T =
+ atomic {f}
/**
* See ScalaDoc on class.
*/
- def flatMap[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) }
-
- /**
- * See ScalaDoc on class.
- */
- def foreach(f: Transaction => Unit)(implicit transactionFamilyName: String): Unit = atomic { f(getTransactionInScope) }
+ def foreach(f: => Unit)(implicit transactionFamilyName: String): Unit =
+ atomic {f}
/**
* Creates a "pure" STM atomic transaction and by-passes all transactions hooks
* such as persistence etc.
* Only for internal usage.
*/
- private[akka] def pureAtomic[T](body: => T): T = new AtomicTemplate[T](
- getGlobalStmInstance, "internal", false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
+ private[akka] def pureAtomic[T](body: => T): T = new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = body
}.execute()
/**
* See ScalaDoc on class.
*/
- def atomic[T](body: => T)(implicit transactionFamilyName: String): T = new AtomicTemplate[T](
- getGlobalStmInstance, transactionFamilyName, false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
- def execute(mtx: MultiverseTransaction): T = body
- override def postStart(mtx: MultiverseTransaction) = {
- val tx = new Transaction
- tx.transaction = Some(mtx)
- setTransaction(Some(tx))
- }
- override def postCommit = {
- if (isTransactionInScope) getTransactionInScope.commit
- else throw new IllegalStateException("No transaction in scope")
- }
- }.execute()
+ def atomic[T](body: => T)(implicit transactionFamilyName: String): T = {
+ // defaultTxBuilder.setFamilyName(transactionFamilyName)
+ // new TransactionTemplate[T](defaultTxBuilder.build) {
+ new TransactionTemplate[T]() { // FIXME take factory
+ def execute(mtx: MultiverseTransaction): T = {
+ val result = body
- /**
- * See ScalaDoc on class.
- */
- def atomic[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
- new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, false, retryCount) {
- def execute(mtx: MultiverseTransaction): T = body
- override def postStart(mtx: MultiverseTransaction) = {
+ log.trace("Committing transaction [%s] \nwith family name [%s] \nby joining transaction set")
+ getTransactionSetInScope.joinCommit(mtx)
+
+ // FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
+ //getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
+
+ clearTransaction
+ result
+ }
+
+ override def onStart(mtx: MultiverseTransaction) = {
+ val txSet = if (!isTransactionSetInScope) createNewTransactionSet
+ else getTransactionSetInScope
val tx = new Transaction
tx.transaction = Some(mtx)
setTransaction(Some(tx))
- }
- override def postCommit = {
- if (isTransactionInScope) getTransactionInScope.commit
- else throw new IllegalStateException("No transaction in scope")
- }
- }.execute
- }
- /**
- * See ScalaDoc on class.
- */
- def atomicReadOnly[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
- new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, true, retryCount) {
- def execute(mtx: MultiverseTransaction): T = body
- override def postStart(mtx: MultiverseTransaction) = {
- val tx = new Transaction
- tx.transaction = Some(mtx)
- setTransaction(Some(tx))
+ txSet.registerOnCommitTask(new Runnable() {
+ def run = tx.commit
+ })
+ txSet.registerOnAbortTask(new Runnable() {
+ def run = tx.abort
+ })
}
- override def postCommit = {
- if (isTransactionInScope) getTransactionInScope.commit
- else throw new IllegalStateException("No transaction in scope")
- }
- }.execute
- }
-
- /**
- * See ScalaDoc on class.
- */
- def atomicReadOnly[T](body: => T): T = {
- new AtomicTemplate[T](true) {
- def execute(mtx: MultiverseTransaction): T = body
- override def postStart(mtx: MultiverseTransaction) = {
- val tx = new Transaction
- tx.transaction = Some(mtx)
- setTransaction(Some(tx))
- }
- override def postCommit = {
- if (isTransactionInScope) getTransactionInScope.commit
- else throw new IllegalStateException("No transaction in scope")
- }
- }.execute
+ }.execute()
}
/**
@@ -216,23 +205,28 @@ object Transaction extends TransactionManagement {
*/
@serializable class Transaction extends Logging {
import Transaction._
-
+
+ log.trace("Creating %s", toString)
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[akka] var transaction: Option[MultiverseTransaction] = None
private[this] val persistentStateMap = new HashMap[String, Committable]
private[akka] val depth = new AtomicInteger(0)
-
+
// --- public methods ---------
def commit = synchronized {
+ log.trace("Committing transaction %s", toString)
pureAtomic {
persistentStateMap.values.foreach(_.commit)
- TransactionManagement.clearTransaction
}
status = TransactionStatus.Completed
}
+ def abort = synchronized {
+ log.trace("Aborting transaction %s", toString)
+ }
+
def isNew = synchronized { status == TransactionStatus.New }
def isActive = synchronized { status == TransactionStatus.Active }
@@ -259,13 +253,13 @@ object Transaction extends TransactionManagement {
private def ensureIsActiveOrAborted =
if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
- throw new IllegalStateException(
- "Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
+ throw new IllegalStateException(
+ "Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrNew =
if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
- throw new IllegalStateException(
- "Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
+ throw new IllegalStateException(
+ "Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
// For reinitialize transaction after sending it over the wire
private[akka] def reinit = synchronized {
@@ -277,14 +271,14 @@ object Transaction extends TransactionManagement {
}
override def equals(that: Any): Boolean = synchronized {
- that != null &&
- that.isInstanceOf[Transaction] &&
- that.asInstanceOf[Transaction].id == this.id
+ that != null &&
+ that.isInstanceOf[Transaction] &&
+ that.asInstanceOf[Transaction].id == this.id
}
-
+
override def hashCode(): Int = synchronized { id.toInt }
-
- override def toString(): String = synchronized { "Transaction[" + id + ", " + status + "]" }
+
+ override def toString = synchronized { "Transaction[" + id + ", " + status + "]" }
}
/**
diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala
index 2dd7ed9c79..60a6ae6de3 100644
--- a/akka-core/src/main/scala/stm/TransactionManagement.scala
+++ b/akka-core/src/main/scala/stm/TransactionManagement.scala
@@ -9,51 +9,80 @@ import java.util.concurrent.atomic.AtomicBoolean
import se.scalablesolutions.akka.util.Logging
import org.multiverse.api.ThreadLocalTransaction._
+import org.multiverse.commitbarriers.CountDownCommitBarrier
class StmException(msg: String) extends RuntimeException(msg)
-class TransactionAwareWrapperException(
- val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
- override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
+class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
+ override def toString = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
}
object TransactionManagement extends TransactionManagement {
import se.scalablesolutions.akka.Config._
- val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 100)
- val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
-
+ val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
+ val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true)
+ val INTERRUPTIBLE = config.getBool("akka.stm.interruptible", true)
+ val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 1000)
+ val TRANSACTION_TIMEOUT = config.getInt("akka.stm.timeout", 10000)
+ val SMART_TX_LENGTH_SELECTOR = config.getBool("akka.stm.smart-tx-length-selector", true)
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
+
def disableTransactions = TRANSACTION_ENABLED.set(false)
- private[akka] val currentTransaction: ThreadLocal[Option[Transaction]] = new ThreadLocal[Option[Transaction]]() {
+ private[akka] val transactionSet = new ThreadLocal[Option[CountDownCommitBarrier]]() {
+ override protected def initialValue: Option[CountDownCommitBarrier] = None
+ }
+
+ private[akka] val transaction = new ThreadLocal[Option[Transaction]]() {
override protected def initialValue: Option[Transaction] = None
}
+
+ private[akka] def getTransactionSet: CountDownCommitBarrier = {
+ val option = transactionSet.get
+ if ((option eq null) || option.isEmpty) throw new IllegalStateException("No TransactionSet in scope")
+ option.get
+ }
+
+ private[akka] def getTransaction: Transaction = {
+ val option = transaction.get
+ if ((option eq null) || option.isEmpty) throw new IllegalStateException("No Transaction in scope")
+ option.get
+ }
}
trait TransactionManagement extends Logging {
- import TransactionManagement.currentTransaction
- private[akka] def createNewTransaction = currentTransaction.set(Some(new Transaction))
-
- private[akka] def setTransaction(transaction: Option[Transaction]) = if (transaction.isDefined) {
- val tx = transaction.get
- currentTransaction.set(transaction)
- if (tx.transaction.isDefined) setThreadLocalTransaction(tx.transaction.get)
- else throw new IllegalStateException("No transaction defined")
+ private[akka] def createNewTransactionSet: CountDownCommitBarrier = {
+ val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS)
+ TransactionManagement.transactionSet.set(Some(txSet))
+ txSet
}
+ private[akka] def setTransactionSet(txSet: Option[CountDownCommitBarrier]) =
+ if (txSet.isDefined) TransactionManagement.transactionSet.set(txSet)
+
+ private[akka] def setTransaction(tx: Option[Transaction]) =
+ if (tx.isDefined) TransactionManagement.transaction.set(tx)
+
+ private[akka] def clearTransactionSet = TransactionManagement.transactionSet.set(None)
+
private[akka] def clearTransaction = {
- currentTransaction.set(None)
+ TransactionManagement.transaction.set(None)
setThreadLocalTransaction(null)
}
- private[akka] def getTransactionInScope = currentTransaction.get.get
-
- private[akka] def isTransactionInScope = currentTransaction.get.isDefined
+ private[akka] def getTransactionSetInScope = TransactionManagement.getTransactionSet
- private[akka] def incrementTransaction = if (isTransactionInScope) getTransactionInScope.increment
+ private[akka] def getTransactionInScope = TransactionManagement.getTransaction
- private[akka] def decrementTransaction = if (isTransactionInScope) getTransactionInScope.decrement
-}
+ private[akka] def isTransactionSetInScope = {
+ val option = TransactionManagement.transactionSet.get
+ (option ne null) && option.isDefined
+ }
+ private[akka] def isTransactionInScope = {
+ val option = TransactionManagement.transaction.get
+ (option ne null) && option.isDefined
+ }
+}
\ No newline at end of file
diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala
index 6003a89f89..1b52faf969 100644
--- a/akka-core/src/main/scala/stm/TransactionalState.scala
+++ b/akka-core/src/main/scala/stm/TransactionalState.scala
@@ -8,8 +8,7 @@ import se.scalablesolutions.akka.stm.Transaction.atomic
import se.scalablesolutions.akka.stm.NoTransactionInScopeException
import se.scalablesolutions.akka.collection._
import se.scalablesolutions.akka.util.UUID
-
-import org.multiverse.datastructures.refs.manual.Ref;
+import org.multiverse.stms.alpha.AlphaRef
/**
* Example Scala usage:
@@ -78,7 +77,7 @@ class TransactionalRef[T] extends Transactional {
implicit val txInitName = "TransactionalRef:Init"
val uuid = UUID.newUuid.toString
- private[this] val ref: Ref[T] = atomic { new Ref }
+ private[this] lazy val ref: AlphaRef[T] = new AlphaRef
def swap(elem: T) = {
ensureIsInTransaction
diff --git a/akka-core/src/test/scala/ActorRegistryTest.scala b/akka-core/src/test/scala/ActorRegistryTest.scala
index faa4a46b18..ada0c027d5 100644
--- a/akka-core/src/test/scala/ActorRegistryTest.scala
+++ b/akka-core/src/test/scala/ActorRegistryTest.scala
@@ -10,6 +10,7 @@ class ActorRegistryTest extends JUnitSuite {
def receive = {
case "ping" =>
record = "pong" + record
+ reply("got ping")
}
}
@@ -128,8 +129,7 @@ class ActorRegistryTest extends JUnitSuite {
val actor2 = new TestActor
actor2.start
record = ""
- ActorRegistry.foreach(actor => actor send "ping")
- Thread.sleep(1000)
+ ActorRegistry.foreach(actor => actor !! "ping")
assert(record === "pongpong")
actor1.stop
actor2.stop
diff --git a/akka-core/src/test/scala/InMemoryActorTest.scala b/akka-core/src/test/scala/InMemoryActorTest.scala
index cd06b80d0a..d4be98fcaa 100644
--- a/akka-core/src/test/scala/InMemoryActorTest.scala
+++ b/akka-core/src/test/scala/InMemoryActorTest.scala
@@ -23,7 +23,7 @@ case class SuccessOneWay(key: String, value: String)
case class FailureOneWay(key: String, value: String, failer: Actor)
class InMemStatefulActor extends Actor {
- timeout = 100000
+ timeout = 5000
makeTransactionRequired
private lazy val mapState = TransactionalState.newMap[String, String]
@@ -86,8 +86,8 @@ class InMemFailerActor extends Actor {
}
class InMemoryActorTest extends JUnitSuite {
+ import Actor.Sender.Self
- /*
@Test
def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -98,7 +98,7 @@ class InMemoryActorTest extends JUnitSuite {
Thread.sleep(1000)
assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
- */
+
@Test
def shouldMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -107,7 +107,7 @@ class InMemoryActorTest extends JUnitSuite {
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
- /*
+
@Test
def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -120,7 +120,7 @@ class InMemoryActorTest extends JUnitSuite {
Thread.sleep(1000)
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
}
- */
+
@Test
def shouldMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -134,7 +134,7 @@ class InMemoryActorTest extends JUnitSuite {
} catch {case e: RuntimeException => {}}
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
}
- /*
+
@Test
def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -145,7 +145,7 @@ class InMemoryActorTest extends JUnitSuite {
Thread.sleep(1000)
assert(2 === (stateful !! GetVectorSize).get)
}
- */
+
@Test
def shouldVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -154,7 +154,7 @@ class InMemoryActorTest extends JUnitSuite {
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assert(2 === (stateful !! GetVectorSize).get)
}
- /*
+
@Test
def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -167,7 +167,7 @@ class InMemoryActorTest extends JUnitSuite {
Thread.sleep(1000)
assert(1 === (stateful !! GetVectorSize).get)
}
- */
+
@Test
def shouldVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -181,7 +181,7 @@ class InMemoryActorTest extends JUnitSuite {
} catch {case e: RuntimeException => {}}
assert(1 === (stateful !! GetVectorSize).get)
}
- /*
+
@Test
def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -192,7 +192,7 @@ class InMemoryActorTest extends JUnitSuite {
Thread.sleep(1000)
assert("new state" === (stateful !! GetRefState).get)
}
- */
+
@Test
def shouldRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -201,7 +201,7 @@ class InMemoryActorTest extends JUnitSuite {
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assert("new state" === (stateful !! GetRefState).get)
}
- /*
+
@Test
def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -212,9 +212,9 @@ class InMemoryActorTest extends JUnitSuite {
failer.start
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
Thread.sleep(1000)
- assert("init" === (stateful !! GetRefState).get) // check that state is == init state
+ assert("init" === (stateful !! (GetRefState, 1000000)).get) // check that state is == init state
}
- */
+
@Test
def shouldRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
diff --git a/akka-core/src/test/scala/PerformanceTest.scala b/akka-core/src/test/scala/PerformanceTest.scala
index d58d075202..778e4d45cd 100644
--- a/akka-core/src/test/scala/PerformanceTest.scala
+++ b/akka-core/src/test/scala/PerformanceTest.scala
@@ -1,4 +1,4 @@
-package test
+package se.scalablesolutions.akka
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@@ -279,7 +279,7 @@ class PerformanceTest extends JUnitSuite {
var nrOfMessages = 2000000
var nrOfActors = 4
- var akkaTime = stressTestAkkaActors(nrOfMessages, nrOfActors, 1000 * 20)
+ var akkaTime = stressTestAkkaActors(nrOfMessages, nrOfActors, 1000 * 30)
var scalaTime = stressTestScalaActors(nrOfMessages, nrOfActors, 1000 * 40)
var ratio: Double = scalaTime.toDouble / akkaTime.toDouble
diff --git a/akka-core/src/test/scala/ShutdownSpec.scala b/akka-core/src/test/scala/ShutdownSpec.scala
index ba03fbe902..20927bbfb1 100644
--- a/akka-core/src/test/scala/ShutdownSpec.scala
+++ b/akka-core/src/test/scala/ShutdownSpec.scala
@@ -2,9 +2,8 @@ package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.actor.Actor
-object ActorShutdownSpec {
+object ActorShutdownRunner {
def main(args: Array[String]) {
-
class MyActor extends Actor {
def receive = {
case "test" => println("received test")
@@ -22,7 +21,7 @@ object ActorShutdownSpec {
// case 2
-object RemoteServerAndClusterShutdownSpec {
+object RemoteServerAndClusterShutdownRunner {
def main(args: Array[String]) {
val s1 = new RemoteServer
val s2 = new RemoteServer
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
index 8a51feed6b..366403ef46 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
@@ -11,7 +11,7 @@ import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.Kernel;
import junit.framework.TestCase;
-/*
+
public class InMemNestedStateTest extends TestCase {
static String messageLog = "";
@@ -133,4 +133,3 @@ public class InMemNestedStateTest extends TestCase {
assertEquals("init", nested.getRefState()); // check that state is == init state
}
}
-*/
\ No newline at end of file
diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala
index 406c914577..ae6068b9c0 100644
--- a/akka-kernel/src/main/scala/Kernel.scala
+++ b/akka-kernel/src/main/scala/Kernel.scala
@@ -80,7 +80,7 @@ object Kernel extends Logging {
(____ /__|_ \__|_ \(____ /
\/ \/ \/ \/
""")
- log.info(" Running version %s", Config.VERSION)
+ log.info(" Running version %s", Config.VERSION)
log.info("==============================")
}
}
diff --git a/akka-patterns/src/main/scala/Agent.scala b/akka-patterns/src/main/scala/Agent.scala
index 4dd8640c32..aea74530a3 100644
--- a/akka-patterns/src/main/scala/Agent.scala
+++ b/akka-patterns/src/main/scala/Agent.scala
@@ -50,30 +50,30 @@ sealed class Agent[T] private (initialValue: T) extends Actor {
* Periodically handles incoming messages
*/
def receive = {
- case FunctionHolder(fun: (T => T)) => atomic { updateData(fun(value.getOrWait)) }
+ case FunctionHolder(fun: (T => T)) => updateData(fun(value.getOrWait))
case ValueHolder(x: T) => updateData(x)
- case ProcedureHolder(fun: (T => Unit)) => atomic { fun(copyStrategy(value.getOrWait)) }
+ case ProcedureHolder(fun: (T => Unit)) => fun(copyStrategy(value.getOrWait))
}
/**
* Specifies how a copy of the value is made, defaults to using identity
*/
- protected def copyStrategy(t : T) : T = t
+ protected def copyStrategy(t: T): T = t
/**
* Updates the internal state with the value provided as a by-name parameter
*/
- private final def updateData(newData: => T) : Unit = atomic { value.swap(newData) }
+ private final def updateData(newData: => T): Unit = value.swap(newData)
/**
* Submits a request to read the internal state.
* A copy of the internal state will be returned, depending on the underlying effective copyStrategy.
* Internally leverages the asynchronous getValue() method and then waits for its result on a CountDownLatch.
*/
- final def get : T = {
+ final def get: T = {
val ref = new AtomicReference[T]
val latch = new CountDownLatch(1)
get((x: T) => {ref.set(x); latch.countDown})
@@ -85,14 +85,14 @@ sealed class Agent[T] private (initialValue: T) extends Actor {
* Asynchronously submits a request to read the internal state. The supplied function will be executed on the returned internal state value.
* A copy of the internal state will be used, depending on the underlying effective copyStrategy.
*/
- final def get(message: (T => Unit)) : Unit = this ! ProcedureHolder(message)
+ final def get(message: (T => Unit)): Unit = this ! ProcedureHolder(message)
/**
* Submits a request to read the internal state.
* A copy of the internal state will be returned, depending on the underlying effective copyStrategy.
* Internally leverages the asynchronous getValue() method and then waits for its result on a CountDownLatch.
*/
- final def apply() : T = get
+ final def apply(): T = get
/**
* Asynchronously submits a request to read the internal state. The supplied function will be executed on the returned internal state value.
@@ -103,22 +103,22 @@ sealed class Agent[T] private (initialValue: T) extends Actor {
/**
* Submits the provided function for execution against the internal agent's state
*/
- final def apply(message: (T => T)) : Unit = this ! FunctionHolder(message)
+ final def apply(message: (T => T)): Unit = this ! FunctionHolder(message)
/**
* Submits a new value to be set as the new agent's internal state
*/
- final def apply(message: T) : Unit = this ! ValueHolder(message)
+ final def apply(message: T): Unit = this ! ValueHolder(message)
/**
* Submits the provided function for execution against the internal agent's state
*/
- final def update(message: (T => T)) : Unit = this ! FunctionHolder(message)
+ final def update(message: (T => T)): Unit = this ! FunctionHolder(message)
/**
* Submits a new value to be set as the new agent's internal state
*/
- final def update(message: T) : Unit = this ! ValueHolder(message)
+ final def update(message: T): Unit = this ! ValueHolder(message)
}
/**
@@ -135,7 +135,7 @@ object Agent {
/**
* Creates a new Agent of type T with the initial value of value
*/
- def apply[T](value:T) : Agent[T] = new Agent(value)
+ def apply[T](value:T): Agent[T] = new Agent(value)
/**
* Creates a new Agent of type T with the initial value of value and with the specified copy function
diff --git a/akka-patterns/src/main/scala/Patterns.scala b/akka-patterns/src/main/scala/Patterns.scala
index b967c07df7..9b7e55ccc9 100644
--- a/akka-patterns/src/main/scala/Patterns.scala
+++ b/akka-patterns/src/main/scala/Patterns.scala
@@ -3,14 +3,14 @@ package se.scalablesolutions.akka.actor.patterns
import se.scalablesolutions.akka.actor.Actor
object Patterns {
- type PF[A,B] = PartialFunction[A,B]
+ type PF[A, B] = PartialFunction[A, B]
/**
* Creates a new PartialFunction whose isDefinedAt is a combination
* of the two parameters, and whose apply is first to call filter.apply and then filtered.apply
*/
- def filter[A,B](filter : PF[A,Unit],filtered : PF[A,B]) : PF[A,B] = {
- case a : A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) =>
+ def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = {
+ case a: A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) =>
filter(a)
filtered(a)
}
@@ -18,39 +18,42 @@ object Patterns {
/**
* Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true
*/
- def intercept[A,B](interceptor : (A) => Unit, interceptee : PF[A,B]) : PF[A,B] = filter(
- { case a if a.isInstanceOf[A] => interceptor(a) },
- interceptee
+ def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] = filter(
+ {case a if a.isInstanceOf[A] => interceptor(a)},
+ interceptee
)
-
+
//FIXME 2.8, use default params with CyclicIterator
- def loadBalancerActor(actors : => InfiniteIterator[Actor]) : Actor = new Actor with LoadBalancer {
+ def loadBalancerActor(actors: => InfiniteIterator[Actor]): Actor = new Actor with LoadBalancer {
val seq = actors
}
- def dispatcherActor(routing : PF[Any,Actor], msgTransformer : (Any) => Any) : Actor = new Actor with Dispatcher {
- override def transform(msg : Any) = msgTransformer(msg)
+ def dispatcherActor(routing: PF[Any, Actor], msgTransformer: (Any) => Any): Actor = new Actor with Dispatcher {
+ override def transform(msg: Any) = msgTransformer(msg)
+
def routes = routing
}
-
- def dispatcherActor(routing : PF[Any,Actor]) : Actor = new Actor with Dispatcher {
- def routes = routing
+
+ def dispatcherActor(routing: PF[Any, Actor]): Actor = new Actor with Dispatcher {
+ def routes = routing
}
- def loggerActor(actorToLog : Actor, logger : (Any) => Unit) : Actor = dispatcherActor (
- { case _ => actorToLog },
+ def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor = dispatcherActor(
+ {case _ => actorToLog},
logger
- )
+ )
}
-trait Dispatcher { self : Actor =>
+trait Dispatcher {
+ self: Actor =>
- protected def transform(msg : Any) : Any = msg
- protected def routes : PartialFunction[Any,Actor]
-
- protected def dispatch : PartialFunction[Any,Unit] = {
+ protected def transform(msg: Any): Any = msg
+
+ protected def routes: PartialFunction[Any, Actor]
+
+ protected def dispatch: PartialFunction[Any, Unit] = {
case a if routes.isDefinedAt(a) => {
- if(self.sender.isDefined)
+ if (self.sender.isDefined)
routes(a) forward transform(a)
else
routes(a) send transform(a)
@@ -60,19 +63,22 @@ trait Dispatcher { self : Actor =>
def receive = dispatch
}
-trait LoadBalancer extends Dispatcher { self : Actor =>
- protected def seq : InfiniteIterator[Actor]
+trait LoadBalancer extends Dispatcher {
+ self: Actor =>
+ protected def seq: InfiniteIterator[Actor]
- protected def routes = { case x if seq.hasNext => seq.next }
+ protected def routes = {case x if seq.hasNext => seq.next}
}
trait InfiniteIterator[T] extends Iterator[T]
-class CyclicIterator[T](items : List[T]) extends InfiniteIterator[T] {
- @volatile private[this] var current : List[T] = items
+class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
+ @volatile private[this] var current: List[T] = items
+
def hasNext = items != Nil
+
def next = {
- val nc = if(current == Nil) items else current
+ val nc = if (current == Nil) items else current
current = nc.tail
nc.head
}
diff --git a/akka-patterns/src/test/scala/ActorPatternsTest.scala b/akka-patterns/src/test/scala/ActorPatternsTest.scala
index 11f2664640..ae6ae5c0e8 100644
--- a/akka-patterns/src/test/scala/ActorPatternsTest.scala
+++ b/akka-patterns/src/test/scala/ActorPatternsTest.scala
@@ -21,12 +21,12 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4")
var targetOk = 0
- val t1 = actor() receive {
+ val t1: Actor = actor {
case `testMsg1` => targetOk += 2
case `testMsg2` => targetOk += 4
}
- val t2 = actor() receive {
+ val t2: Actor = actor {
case `testMsg3` => targetOk += 8
}
@@ -48,7 +48,7 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
@Test def testLogger = verify(new TestActor {
def test = {
val msgs = new HashSet[Any]
- val t1 = actor() receive {
+ val t1: Actor = actor {
case _ =>
}
val l = loggerActor(t1,(x) => msgs += x)
diff --git a/akka-patterns/src/test/scala/AgentTest.scala b/akka-patterns/src/test/scala/AgentTest.scala
index 17ccce8e0a..013cd13ada 100644
--- a/akka-patterns/src/test/scala/AgentTest.scala
+++ b/akka-patterns/src/test/scala/AgentTest.scala
@@ -7,18 +7,23 @@ import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.MustMatchers
import org.junit.{Test}
+/*
@RunWith(classOf[JUnitRunner])
class AgentTest extends junit.framework.TestCase with Suite with MustMatchers with ActorTestUtil with Logging {
- @Test def testAgent = verify(new TestActor {
- def test = {
- val t = Agent(5)
- handle(t){
- t.update( _ + 1 )
- t.update( _ * 2 )
- val r = t()
- r must be (12)
- }
- }
+ @Test def testAgent = verify(new TestActor {
+ def test = {
+ atomic {
+ val t = Agent(5)
+ handle(t) {
+ t.update(_ + 1)
+ t.update(_ * 2)
+
+ val r = t()
+ r must be(12)
+ }
+ }
+ }
})
-}
\ No newline at end of file
+}
+*/
diff --git a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala
index 0e232f5ce9..a7fed923eb 100644
--- a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala
@@ -1,13 +1,9 @@
package se.scalablesolutions.akka.state
-import se.scalablesolutions.akka.actor.Actor
-
-import junit.framework.TestCase
+import se.scalablesolutions.akka.actor.{Actor, Transactor}
import org.junit.Test
import org.junit.Assert._
-import org.apache.cassandra.service.CassandraDaemon
-import org.junit.BeforeClass
import org.junit.Before
import org.scalatest.junit.JUnitSuite
@@ -28,9 +24,8 @@ case class SetRefStateOneWay(key: String)
case class SuccessOneWay(key: String, value: String)
case class FailureOneWay(key: String, value: String, failer: Actor)
-class CassandraPersistentActor extends Actor {
+class CassandraPersistentActor extends Transactor {
timeout = 100000
- makeTransactionRequired
private lazy val mapState = CassandraStorage.newMap
private lazy val vectorState = CassandraStorage.newVector
@@ -66,8 +61,7 @@ class CassandraPersistentActor extends Actor {
}
}
-@serializable class PersistentFailerActor extends Actor {
- makeTransactionRequired
+@serializable class PersistentFailerActor extends Transactor {
def receive = {
case "Failure" =>
throw new RuntimeException("expected")
@@ -76,8 +70,8 @@ class CassandraPersistentActor extends Actor {
class CassandraPersistentActorSpec extends JUnitSuite {
- @Before
- def startCassandra = EmbeddedCassandraService.start
+ //@Before
+ //def startCassandra = EmbeddedCassandraService.start
@Test
def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
index dc55e0eca1..f52841b817 100644
--- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
+++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
@@ -4,14 +4,15 @@
package se.scalablesolutions.akka.state
-import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction
+import se.scalablesolutions.akka.stm.TransactionManagement.transaction
import se.scalablesolutions.akka.collection._
import se.scalablesolutions.akka.util.Logging
-import org.codehaus.aspectwerkz.proxy.Uuid
-
+// FIXME move to 'stm' package + add message with more info
class NoTransactionInScopeException extends RuntimeException
+class StorageException(message: String) extends RuntimeException(message)
+
/**
* Example Scala usage.
*
@@ -64,10 +65,6 @@ trait Storage {
throw new UnsupportedOperationException
}
-
-
-
-
/**
* Implementation of PersistentMap for every concrete
* storage will have the same workflow. This abstracts the workflow.
@@ -162,8 +159,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
}
private def register = {
- if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
- currentTransaction.get.get.register(uuid, this)
+ if (transaction.get.isEmpty) throw new NoTransactionInScopeException
+ transaction.get.get.register(uuid, this)
}
}
@@ -236,8 +233,8 @@ trait PersistentVector[T] extends RandomAccessSeq[T] with Transactional with Com
def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length
private def register = {
- if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
- currentTransaction.get.get.register(uuid, this)
+ if (transaction.get.isEmpty) throw new NoTransactionInScopeException
+ transaction.get.get.register(uuid, this)
}
}
@@ -272,8 +269,8 @@ trait PersistentRef[T] extends Transactional with Committable {
}
private def register = {
- if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
- currentTransaction.get.get.register(uuid, this)
+ if (transaction.get.isEmpty) throw new NoTransactionInScopeException
+ transaction.get.get.register(uuid, this)
}
}
@@ -397,7 +394,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
throw new UnsupportedOperationException("dequeueAll not supported")
private def register = {
- if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
- currentTransaction.get.get.register(uuid, this)
+ if (transaction.get.isEmpty) throw new NoTransactionInScopeException
+ transaction.get.get.register(uuid, this)
}
}
diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala
index 8681ebadb9..8ad5d94355 100644
--- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala
@@ -8,7 +8,7 @@ import org.junit.Assert._
import _root_.dispatch.json.{JsNumber, JsValue}
import _root_.dispatch.json.Js._
-import se.scalablesolutions.akka.actor.Actor
+import se.scalablesolutions.akka.actor.{Transactor, Actor}
/**
* A persistent actor based on MongoDB storage.
@@ -29,10 +29,10 @@ case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: Actor)
case class Credit(accountNo: String, amount: BigInt)
case object LogSize
-class BankAccountActor extends Actor {
- makeTransactionRequired
- private val accountState = MongoStorage.newMap
- private val txnLog = MongoStorage.newVector
+class BankAccountActor extends Transactor {
+
+ private lazy val accountState = MongoStorage.newMap
+ private lazy val txnLog = MongoStorage.newVector
def receive: PartialFunction[Any, Unit] = {
// check balance
@@ -91,8 +91,7 @@ class BankAccountActor extends Actor {
}
}
-@serializable class PersistentFailerActor extends Actor {
- makeTransactionRequired
+@serializable class PersistentFailerActor extends Transactor {
def receive = {
case "Failure" =>
throw new RuntimeException("expected")
diff --git a/akka-persistence/akka-persistence-redis/pom.xml b/akka-persistence/akka-persistence-redis/pom.xml
index c6088e573b..112d4764cb 100644
--- a/akka-persistence/akka-persistence-redis/pom.xml
+++ b/akka-persistence/akka-persistence-redis/pom.xml
@@ -24,7 +24,7 @@
com.redis
redisclient
- 1.0.1
+ 1.1
diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
index 00a44d0513..be214087f3 100644
--- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
+++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
@@ -15,10 +15,10 @@ trait Encoder {
}
trait CommonsCodecBase64 {
- val base64 = new org.apache.commons.codec.binary.Base64
-
- def encode(bytes: Array[Byte]): Array[Byte] = base64.encode(bytes)
- def decode(bytes: Array[Byte]): Array[Byte] = base64.decode(bytes)
+ import org.apache.commons.codec.binary.Base64._
+
+ def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes)
+ def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes)
}
object Base64Encoder extends Encoder with CommonsCodecBase64
@@ -45,7 +45,7 @@ private [akka] object RedisStorageBackend extends
val REDIS_SERVER_HOSTNAME = config.getString("akka.storage.redis.hostname", "127.0.0.1")
val REDIS_SERVER_PORT = config.getInt("akka.storage.redis.port", 6379)
- val db = new Redis(REDIS_SERVER_HOSTNAME, REDIS_SERVER_PORT)
+ val db = new RedisClient(REDIS_SERVER_HOSTNAME, REDIS_SERVER_PORT)
/**
* Map storage in Redis.
@@ -72,11 +72,11 @@ private [akka] object RedisStorageBackend extends
* base64(T1):base64("debasish.programming_language") -> "scala"
*
*/
- def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) {
+ def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]): Unit = withErrorHandling {
insertMapStorageEntriesFor(name, List((key, value)))
}
- def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) {
+ def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]): Unit = withErrorHandling {
mset(entries.map(e =>
(makeRedisKey(name, e._1), new String(e._2))))
}
@@ -89,22 +89,22 @@ private [akka] object RedisStorageBackend extends
* : is chosen since it cannot appear in base64 encoding charset
* both parts of the key need to be based64 encoded since there can be spaces within each of them
*/
- private [this] def makeRedisKey(name: String, key: Array[Byte]): String = {
+ private [this] def makeRedisKey(name: String, key: Array[Byte]): String = withErrorHandling {
"%s:%s".format(new String(encode(name.getBytes)), new String(encode(key)))
}
- private [this] def makeKeyFromRedisKey(redisKey: String) = {
+ private [this] def makeKeyFromRedisKey(redisKey: String) = withErrorHandling {
val nk = redisKey.split(':').map{e: String => decode(e.getBytes)}
(nk(0), nk(1))
}
- private [this] def mset(entries: List[(String, String)]) {
+ private [this] def mset(entries: List[(String, String)]): Unit = withErrorHandling {
entries.foreach {e: (String, String) =>
db.set(e._1, e._2)
}
}
- def removeMapStorageFor(name: String): Unit = {
+ def removeMapStorageFor(name: String): Unit = withErrorHandling {
db.keys("%s:*".format(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
@@ -113,18 +113,19 @@ private [akka] object RedisStorageBackend extends
}
}
- def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {
+ def removeMapStorageFor(name: String, key: Array[Byte]): Unit = withErrorHandling {
db.delete(makeRedisKey(name, key))
}
- def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] =
+ def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = withErrorHandling {
db.get(makeRedisKey(name, key)) match {
case None =>
throw new Predef.NoSuchElementException(new String(key) + " not present")
case Some(s) => Some(s.getBytes)
}
+ }
- def getMapStorageSizeFor(name: String): Int = {
+ def getMapStorageSizeFor(name: String): Int = withErrorHandling {
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
case None => 0
case Some(keys) =>
@@ -132,7 +133,7 @@ private [akka] object RedisStorageBackend extends
}
}
- def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = {
+ def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = withErrorHandling {
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
@@ -143,7 +144,7 @@ private [akka] object RedisStorageBackend extends
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]],
finish: Option[Array[Byte]],
- count: Int): List[(Array[Byte], Array[Byte])] = {
+ count: Int): List[(Array[Byte], Array[Byte])] = withErrorHandling {
import scala.collection.immutable.TreeMap
val wholeSorted =
@@ -188,27 +189,27 @@ private [akka] object RedisStorageBackend extends
}
}
- def insertVectorStorageEntryFor(name: String, element: Array[Byte]) {
- db.pushHead(new String(encode(name.getBytes)), new String(element))
+ def insertVectorStorageEntryFor(name: String, element: Array[Byte]): Unit = withErrorHandling {
+ db.lpush(new String(encode(name.getBytes)), new String(element))
}
- def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) {
+ def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]): Unit = withErrorHandling {
elements.foreach(insertVectorStorageEntryFor(name, _))
}
- def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) {
- db.listSet(new String(encode(name.getBytes)), index, new String(elem))
+ def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]): Unit = withErrorHandling {
+ db.lset(new String(encode(name.getBytes)), index, new String(elem))
}
- def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
- db.listIndex(new String(encode(name.getBytes)), index) match {
+ def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling {
+ db.lindex(new String(encode(name.getBytes)), index) match {
case None =>
throw new Predef.NoSuchElementException(name + " does not have element at " + index)
case Some(e) => e.getBytes
}
}
- def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
+ def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = withErrorHandling {
/**
* count is the max number of results to return. Start with
* start or 0 (if start is not defined) and go until
@@ -221,27 +222,27 @@ private [akka] object RedisStorageBackend extends
if (f >= s) Math.min(count, (f - s)) else count
}
else count
- db.listRange(new String(encode(name.getBytes)), s, s + cnt - 1) match {
+ db.lrange(new String(encode(name.getBytes)), s, s + cnt - 1) match {
case None =>
throw new Predef.NoSuchElementException(name + " does not have elements in the range specified")
case Some(l) =>
- l map (_.getBytes)
+ l map (_.get.getBytes)
}
}
def getVectorStorageSizeFor(name: String): Int = {
- db.listLength(new String(encode(name.getBytes))) match {
+ db.llen(new String(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(l) => l
}
}
- def insertRefStorageFor(name: String, element: Array[Byte]) {
+ def insertRefStorageFor(name: String, element: Array[Byte]): Unit = withErrorHandling {
db.set(new String(encode(name.getBytes)), new String(element))
}
- def getRefStorageFor(name: String): Option[Array[Byte]] = {
+ def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling {
db.get(new String(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
@@ -250,13 +251,14 @@ private [akka] object RedisStorageBackend extends
}
// add to the end of the queue
- def enqueue(name: String, item: Array[Byte]): Boolean = {
- db.pushTail(new String(encode(name.getBytes)), new String(item))
+ def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling {
+ db.rpush(new String(encode(name.getBytes)), new String(item))
}
+
// pop from the front of the queue
- def dequeue(name: String): Option[Array[Byte]] = {
- db.popHead(new String(encode(name.getBytes))) match {
+ def dequeue(name: String): Option[Array[Byte]] = withErrorHandling {
+ db.lpop(new String(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(s) =>
@@ -265,8 +267,8 @@ private [akka] object RedisStorageBackend extends
}
// get the size of the queue
- def size(name: String): Int = {
- db.listLength(new String(encode(name.getBytes))) match {
+ def size(name: String): Int = withErrorHandling {
+ db.llen(new String(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(l) => l
@@ -275,64 +277,86 @@ private [akka] object RedisStorageBackend extends
// return an array of items currently stored in the queue
// start is the item to begin, count is how many items to return
- def peek(name: String, start: Int, count: Int): List[Array[Byte]] = count match {
- case 1 =>
- db.listIndex(new String(encode(name.getBytes)), start) match {
- case None =>
- throw new Predef.NoSuchElementException("No element at " + start)
- case Some(s) =>
- List(s.getBytes)
- }
- case n =>
- db.listRange(new String(encode(name.getBytes)), start, start + count - 1) match {
- case None =>
- throw new Predef.NoSuchElementException(
- "No element found between " + start + " and " + (start + count - 1))
- case Some(es) =>
- es.map(_.getBytes)
- }
+ def peek(name: String, start: Int, count: Int): List[Array[Byte]] = withErrorHandling {
+ count match {
+ case 1 =>
+ db.lindex(new String(encode(name.getBytes)), start) match {
+ case None =>
+ throw new Predef.NoSuchElementException("No element at " + start)
+ case Some(s) =>
+ List(s.getBytes)
+ }
+ case n =>
+ db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match {
+ case None =>
+ throw new Predef.NoSuchElementException(
+ "No element found between " + start + " and " + (start + count - 1))
+ case Some(es) =>
+ es.map(_.get.getBytes)
+ }
+ }
}
// completely delete the queue
- def remove(name: String): Boolean = {
- db.delete(new String(encode(name.getBytes)))
+ def remove(name: String): Boolean = withErrorHandling {
+ db.delete(new String(encode(name.getBytes))) match {
+ case Some(1) => true
+ case _ => false
+ }
}
// add item to sorted set identified by name
- def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = {
- db.zAdd(new String(encode(name.getBytes)), zscore, new String(item))
+ def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling {
+ db.zadd(new String(encode(name.getBytes)), zscore, new String(item)) match {
+ case Some(1) => true
+ case _ => false
+ }
}
// remove item from sorted set identified by name
- def zrem(name: String, item: Array[Byte]): Boolean = {
- db.zRem(new String(encode(name.getBytes)), new String(item))
+ def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling {
+ db.zrem(new String(encode(name.getBytes)), new String(item)) match {
+ case Some(1) => true
+ case _ => false
+ }
}
// cardinality of the set identified by name
- def zcard(name: String): Int = {
- db.zCard(new String(encode(name.getBytes))) match {
+ def zcard(name: String): Int = withErrorHandling {
+ db.zcard(new String(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(l) => l
}
}
- def zscore(name: String, item: Array[Byte]): String = {
- db.zScore(new String(encode(name.getBytes)), new String(item)) match {
+ def zscore(name: String, item: Array[Byte]): String = withErrorHandling {
+ db.zscore(new String(encode(name.getBytes)), new String(item)) match {
case None =>
throw new Predef.NoSuchElementException(new String(item) + " not present")
case Some(s) => s
}
}
- def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = {
- db.zRange(new String(encode(name.getBytes)), start.toString, end.toString, SocketOperations.ASC, false) match {
+ def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = withErrorHandling {
+ db.zrange(new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC, false) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(s) =>
- s.map(_.getBytes)
+ s.map(_.get.getBytes)
}
}
- def flushDB = db.flushDb
+ def flushDB = withErrorHandling(db.flushDb)
+
+ private def withErrorHandling[T](body: => T): T = {
+ try {
+ body
+ } catch {
+ case e: java.lang.NullPointerException =>
+ throw new StorageException("Could not connect to Redis server")
+ case e =>
+ throw new StorageException("Error in Redis: " + e.getMessage)
+ }
+ }
}
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
index 86d4384b70..8c91f0ff61 100644
--- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
@@ -29,6 +29,7 @@ case object LogSize
class AccountActor extends Transactor {
private lazy val accountState = RedisStorage.newMap
private lazy val txnLog = RedisStorage.newVector
+ //timeout = 5000
def receive = {
// check balance
@@ -86,6 +87,7 @@ class AccountActor extends Transactor {
}
@serializable class PersistentFailerActor extends Transactor {
+ //timeout = 5000
def receive = {
case "Failure" =>
throw new RuntimeException("expected")
@@ -138,7 +140,7 @@ class RedisPersistentActorSpec extends TestCase {
bactor.start
bactor !! Credit("a-123", 5000)
- assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
+ assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
val failer = new PersistentFailerActor
failer.start
@@ -147,7 +149,7 @@ class RedisPersistentActorSpec extends TestCase {
fail("should throw exception")
} catch { case e: RuntimeException => {}}
- assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
+ assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
// should not count the failed one
assertEquals(3, (bactor !! LogSize).get)
diff --git a/akka-util-java/pom.xml b/akka-util-java/pom.xml
index e0a729491b..6db055b223 100644
--- a/akka-util-java/pom.xml
+++ b/akka-util-java/pom.xml
@@ -24,34 +24,6 @@
protobuf-java
2.2.0
-