diff --git a/akka-actor/src/main/scala/actor/Actor.scala b/akka-actor/src/main/scala/actor/Actor.scala index dc74028964..6667f7fbc4 100644 --- a/akka-actor/src/main/scala/actor/Actor.scala +++ b/akka-actor/src/main/scala/actor/Actor.scala @@ -17,17 +17,6 @@ import scala.reflect.BeanProperty import akka.util. {ReflectiveAccess, Logging, Duration} import akka.japi.Procedure -/** - * Implements the Transactor abstraction. E.g. a transactional actor. - *
- * Equivalent to invoking themakeTransactionRequired method in the body of the ActorJonas Bonér
- */
-trait Transactor extends Actor {
- self.makeTransactionRequired
-}
-
/**
* Extend this abstract class to create a remote actor.
*
@@ -246,7 +235,7 @@ object Actor extends Logging {
* Here you find functions like:
* - !, !!, !!! and forward
* - link, unlink, startLink, spawnLink etc
- * - makeTransactional, makeRemote etc.
+ * - makeRemote etc.
* - start, stop
* - etc.
*
diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala
index 09477e9c03..adf7a1cde7 100644
--- a/akka-actor/src/main/scala/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/actor/ActorRef.scala
@@ -7,17 +7,10 @@ package akka.actor
import akka.dispatch._
import akka.config.Config._
import akka.config.Supervision._
-import akka.stm.global._
-import akka.stm.TransactionManagement._
-import akka.stm.{ TransactionManagement, TransactionSetAbortedException }
import akka.AkkaException
import akka.util._
import ReflectiveAccess._
-import org.multiverse.api.ThreadLocalTransaction._
-import org.multiverse.commitbarriers.CountDownCommitBarrier
-import org.multiverse.api.exceptions.DeadTransactionException
-
import java.net.InetSocketAddress
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.concurrent.locks.ReentrantLock
@@ -39,10 +32,6 @@ private[akka] object ActorRefInternals extends Logging {
object RUNNING extends StatusType
object BEING_RESTARTED extends StatusType
object SHUTDOWN extends StatusType
-
- case class TransactorConfig(factory: Option[TransactionFactory] = None, config: TransactionConfig = DefaultGlobalTransactionConfig)
- val DefaultTransactorConfig = TransactorConfig()
- val NoTransactionConfig = TransactorConfig()
}
@@ -78,7 +67,7 @@ private[akka] object ActorRefInternals extends Logging {
*
* @author Jonas Bonér
*/
-trait ActorRef extends ActorRefShared with TransactionManagement with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef =>
+trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef =>
//Reuse same logger
import Actor.log
@@ -174,23 +163,6 @@ trait ActorRef extends ActorRefShared with TransactionManagement with java.lang.
@volatile
protected[akka] var hotswap = Stack[PartialFunction[Any, Unit]]()
- /**
- * User overridable callback/setting.
- *
- * Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should
- * start if there is no one running, else it joins the existing transaction.
- */
- @volatile
- protected[akka] var transactorConfig = ActorRefInternals.NoTransactionConfig
-
- /**
- * Returns true if this Actor is a Transactor
- */
- def isTransactor: Boolean = {
- val c = transactorConfig
- (c ne ActorRefInternals.NoTransactionConfig) && (c ne null) //Could possibly be null if called before var init
- }
-
/**
* This is a reference to the message currently being processed by the actor
*/
@@ -412,34 +384,6 @@ trait ActorRef extends ActorRefShared with TransactionManagement with java.lang.
*/
def makeRemote(address: InetSocketAddress): Unit
- /**
- * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
- * However, it will always participate in an existing transaction.
- */
- def makeTransactionRequired(): Unit
-
- /**
- * Sets the transaction configuration for this actor. Needs to be invoked before the actor is started.
- */
- def transactionConfig_=(config: TransactionConfig): Unit
-
- /**
- * Akka Java API
- * Sets the transaction configuration for this actor. Needs to be invoked before the actor is started.
- */
- def setTransactionConfig(config: TransactionConfig): Unit = transactionConfig = config
-
- /**
- * Get the transaction configuration for this actor.
- */
- def transactionConfig: TransactionConfig
-
- /**
- * Akka Java API
- * Get the transaction configuration for this actor.
- */
- def getTransactionConfig(): TransactionConfig = transactionConfig
-
/**
* Returns the home address and port for this actor.
*/
@@ -664,7 +608,6 @@ class LocalActorRef private[akka] (
__id: String,
__hostname: String,
__port: Int,
- __isTransactor: Boolean,
__timeout: Long,
__receiveTimeout: Option[Long],
__lifeCycle: LifeCycle,
@@ -675,7 +618,6 @@ class LocalActorRef private[akka] (
_uuid = __uuid
id = __id
homeAddress = (__hostname, __port)
- transactorConfig = if (__isTransactor) ActorRefInternals.DefaultTransactorConfig else ActorRefInternals.NoTransactionConfig
timeout = __timeout
receiveTimeout = __receiveTimeout
lifeCycle = __lifeCycle
@@ -737,33 +679,6 @@ class LocalActorRef private[akka] (
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
}
- /**
- * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
- * However, it will always participate in an existing transaction.
- */
- def makeTransactionRequired() = guard.withGuard {
- if (!isRunning || isBeingRestarted) {
- if (transactorConfig eq ActorRefInternals.NoTransactionConfig)
- transactorConfig = ActorRefInternals.DefaultTransactorConfig
- }
- else throw new ActorInitializationException(
- "Can not make actor transaction required after it has been started")
- }
-
- /**
- * Sets the transaction configuration for this actor. Needs to be invoked before the actor is started.
- */
- def transactionConfig_=(configuration: TransactionConfig) = guard.withGuard {
- if (!isRunning || isBeingRestarted) transactorConfig = transactorConfig.copy(config = configuration)
- else throw new ActorInitializationException(
- "Cannot set transaction configuration for actor after it has been started")
- }
-
- /**
- * Get the transaction configuration for this actor.
- */
- def transactionConfig: TransactionConfig = transactorConfig.config
-
/**
* Set the contact address for this actor. This is used for replying to messages
* sent asynchronously when no reply channel exists.
@@ -784,8 +699,6 @@ class LocalActorRef private[akka] (
"Can't restart an actor that has been shut down with 'stop' or 'exit'")
if (!isRunning) {
dispatcher.attach(this)
- if (isTransactor)
- transactorConfig = transactorConfig.copy(factory = Some(TransactionFactory(transactorConfig.config, id)))
_status = ActorRefInternals.RUNNING
@@ -806,7 +719,6 @@ class LocalActorRef private[akka] (
receiveTimeout = None
cancelReceiveTimeout
dispatcher.detach(this)
- transactorConfig = transactorConfig.copy(factory = None)
_status = ActorRefInternals.SHUTDOWN
actor.postStop
ActorRegistry.unregister(this)
@@ -948,13 +860,11 @@ class LocalActorRef private[akka] (
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
- joinTransaction(message)
-
if (remoteAddress.isDefined && isRemotingEnabled) {
RemoteClientModule.send[Any](
message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor)
} else {
- val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get)
+ val invocation = new MessageInvocation(this, message, senderOption, None)
dispatcher dispatchMessage invocation
}
}
@@ -964,7 +874,6 @@ class LocalActorRef private[akka] (
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
- joinTransaction(message)
if (remoteAddress.isDefined && isRemotingEnabled) {
val future = RemoteClientModule.send[T](
@@ -974,7 +883,7 @@ class LocalActorRef private[akka] (
} else {
val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout))
val invocation = new MessageInvocation(
- this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]], transactionSet.get)
+ this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]])
dispatcher dispatchMessage invocation
future.get
}
@@ -1151,61 +1060,18 @@ class LocalActorRef private[akka] (
a
}
- private def joinTransaction(message: Any) = if (isTransactionSetInScope) {
- import org.multiverse.api.ThreadLocalTransaction
- val oldTxSet = getTransactionSetInScope
- val currentTxSet = if (oldTxSet.isAborted || oldTxSet.isCommitted) {
- clearTransactionSet
- createNewTransactionSet
- } else oldTxSet
- Actor.log.trace("Joining transaction set [" + currentTxSet +
- "];\n\tactor " + toString +
- "\n\twith message [" + message + "]")
- val mtx = ThreadLocalTransaction.getThreadLocalTransaction
- if ((mtx eq null) || mtx.getStatus.isDead) currentTxSet.incParties
- else currentTxSet.incParties(mtx, 1)
- }
-
private def dispatch[T](messageHandle: MessageInvocation) = {
Actor.log.trace("Invoking actor with message: %s\n", messageHandle)
val message = messageHandle.message //serializeMessage(messageHandle.message)
- val isXactor = isTransactor
- var topLevelTransaction = false
- val txSet: Option[CountDownCommitBarrier] =
- if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
- else {
- topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
- if (isXactor) {
- Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor " + toString +
- "\n\twith message " + messageHandle)
- Some(createNewTransactionSet)
- } else None
- }
- setTransactionSet(txSet)
try {
cancelReceiveTimeout // FIXME: leave this here?
- if (isXactor) {
- val txFactory = transactorConfig.factory.getOrElse(DefaultGlobalTransactionFactory)
- atomic(txFactory) {
- actor(message)
- setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
- }
- } else {
- actor(message)
- setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
- }
+ actor(message)
} catch {
- case e: DeadTransactionException =>
- handleExceptionInDispatch(
- new TransactionSetAbortedException("Transaction set has been aborted by another participant"),
- message, topLevelTransaction)
case e: InterruptedException => {} // received message while actor is shutting down, ignore
- case e => handleExceptionInDispatch(e, message, topLevelTransaction)
+ case e => handleExceptionInDispatch(e, message)
}
finally {
- clearTransaction
- if (topLevelTransaction) clearTransactionSet
checkReceiveTimeout // Reschedule receive timeout
}
}
@@ -1226,26 +1092,14 @@ class LocalActorRef private[akka] (
true
}
- private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = {
+ private def handleExceptionInDispatch(reason: Throwable, message: Any) = {
Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
//Prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
- // abort transaction set
- if (isTransactionSetInScope) {
- val txSet = getTransactionSetInScope
- if (!txSet.isCommitted) {
- Actor.log.debug("Aborting transaction set [%s]", txSet)
- txSet.abort
- }
- }
-
senderFuture.foreach(_.completeWithException(reason))
- clearTransaction
- if (topLevelTransaction) clearTransactionSet
-
if (supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason))
else {
lifeCycle match {
@@ -1298,7 +1152,6 @@ class LocalActorRef private[akka] (
actor.preStart // run actor preStart
Actor.log.trace("[%s] has started", toString)
ActorRegistry.register(this)
- clearTransactionSet // clear transaction set that might have been created if atomic block has been used within the Actor constructor body
}
/*
@@ -1395,9 +1248,6 @@ private[akka] case class RemoteActorRef private[akka] (
def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
- def makeTransactionRequired: Unit = unsupported
- def transactionConfig_=(config: TransactionConfig): Unit = unsupported
- def transactionConfig: TransactionConfig = unsupported
def makeRemote(hostname: String, port: Int): Unit = unsupported
def makeRemote(address: InetSocketAddress): Unit = unsupported
def homeAddress_=(address: InetSocketAddress): Unit = unsupported
diff --git a/akka-actor/src/main/scala/actor/UntypedActor.scala b/akka-actor/src/main/scala/actor/UntypedActor.scala
index b31077c8f3..1c51fa47f1 100644
--- a/akka-actor/src/main/scala/actor/UntypedActor.scala
+++ b/akka-actor/src/main/scala/actor/UntypedActor.scala
@@ -5,7 +5,6 @@
package akka.actor
import akka.dispatch._
-import akka.stm.global._
import akka.config.Supervision._
import akka.japi.Procedure
@@ -84,15 +83,6 @@ abstract class UntypedActor extends Actor {
def onReceive(message: Any): Unit
}
-/**
- * Implements the Transactor abstraction. E.g. a transactional UntypedActor.
- *
- * @author Jonas Bonér
- */
-abstract class UntypedTransactor extends UntypedActor {
- self.makeTransactionRequired
-}
-
/**
* Factory closure for an UntypedActor, to be used with 'UntypedActor.actorOf(factory)'.
*
diff --git a/akka-actor/src/main/scala/config/SupervisionConfig.scala b/akka-actor/src/main/scala/config/SupervisionConfig.scala
index 9f5af03ddf..f3d6e1ada9 100644
--- a/akka-actor/src/main/scala/config/SupervisionConfig.scala
+++ b/akka-actor/src/main/scala/config/SupervisionConfig.scala
@@ -105,7 +105,6 @@ object Supervision {
val target: Class[_],
val lifeCycle: LifeCycle,
val timeout: Long,
- val transactionRequired: Boolean,
_dispatcher: MessageDispatcher, // optional
_remoteAddress: RemoteAddress // optional
) extends Server {
@@ -114,48 +113,24 @@ object Supervision {
val remoteAddress: Option[RemoteAddress] = Option(_remoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long) =
- this(null: Class[_], target, lifeCycle, timeout, false, null.asInstanceOf[MessageDispatcher], null: RemoteAddress)
+ this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher, null: RemoteAddress)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long) =
- this(intf, target, lifeCycle, timeout, false, null.asInstanceOf[MessageDispatcher], null: RemoteAddress)
+ this(intf, target, lifeCycle, timeout, null: MessageDispatcher, null: RemoteAddress)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) =
- this(intf, target, lifeCycle, timeout, false, dispatcher, null)
+ this(intf, target, lifeCycle, timeout, dispatcher, null: RemoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher) =
- this(null: Class[_], target, lifeCycle, timeout, false, dispatcher, null:RemoteAddress)
+ this(null: Class[_], target, lifeCycle, timeout, dispatcher, null: RemoteAddress)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) =
- this(intf, target, lifeCycle, timeout, false, null: MessageDispatcher, remoteAddress)
+ this(intf, target, lifeCycle, timeout, null: MessageDispatcher, remoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, remoteAddress: RemoteAddress) =
- this(null: Class[_], target, lifeCycle, timeout, false, null, remoteAddress)
-
- def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
- this(intf, target, lifeCycle, timeout, false, dispatcher, remoteAddress)
+ this(null: Class[_], target, lifeCycle, timeout, null: MessageDispatcher, remoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
- this(null: Class[_], target, lifeCycle, timeout, false, dispatcher, remoteAddress)
-
- def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean) =
- this(intf, target, lifeCycle, timeout, transactionRequired, null: MessageDispatcher, null: RemoteAddress)
-
- def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean) =
- this(null: Class[_], target, lifeCycle, timeout, transactionRequired, null: MessageDispatcher, null: RemoteAddress)
-
- def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
- this(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, null: RemoteAddress)
-
- def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
- this(null: Class[_], target, lifeCycle, timeout, transactionRequired, dispatcher, null: RemoteAddress)
-
- def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
- this(intf, target, lifeCycle, timeout, transactionRequired, null: MessageDispatcher, remoteAddress)
-
- def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
- this(null: Class[_], target, lifeCycle, timeout, transactionRequired, null: MessageDispatcher, remoteAddress)
-
- def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
- this(null: Class[_], target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
+ this(null: Class[_], target, lifeCycle, timeout, dispatcher, remoteAddress)
}
}
diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala
index 33a7a62af3..467bccd13e 100644
--- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala
+++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala
@@ -4,8 +4,6 @@
package akka.dispatch
-import org.multiverse.commitbarriers.CountDownCommitBarrier
-
import java.util.concurrent._
import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong}
import akka.util. {Switch, ReentrantGuard, Logging, HashCode}
@@ -17,8 +15,7 @@ import akka.actor._
final class MessageInvocation(val receiver: ActorRef,
val message: Any,
val sender: Option[ActorRef],
- val senderFuture: Option[CompletableFuture[Any]],
- val transactionSet: Option[CountDownCommitBarrier]) {
+ val senderFuture: Option[CompletableFuture[Any]]) {
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
def invoke = try {
@@ -47,7 +44,6 @@ final class MessageInvocation(val receiver: ActorRef,
"\n\treceiver = " + receiver +
"\n\tsender = " + sender +
"\n\tsenderFuture = " + senderFuture +
- "\n\ttransactionSet = " + transactionSet +
"]"
}
}
diff --git a/akka-actor/src/main/scala/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/util/ReflectiveAccess.scala
index 3320522ed6..5257d596f0 100644
--- a/akka-actor/src/main/scala/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/util/ReflectiveAccess.scala
@@ -7,7 +7,6 @@ package akka.util
import akka.actor.{ActorRef, IllegalActorStateException, ActorType, Uuid}
import akka.dispatch.{Future, CompletableFuture, MessageInvocation}
import akka.config.{Config, ModuleNotAvailableException}
-import akka.stm.Transaction
import akka.AkkaException
import java.net.InetSocketAddress
@@ -23,12 +22,10 @@ object ReflectiveAccess extends Logging {
lazy val isRemotingEnabled = RemoteClientModule.isRemotingEnabled
lazy val isTypedActorEnabled = TypedActorModule.isTypedActorEnabled
- lazy val isJtaEnabled = JtaModule.isJtaEnabled
lazy val isEnterpriseEnabled = EnterpriseModule.isEnterpriseEnabled
def ensureRemotingEnabled = RemoteClientModule.ensureRemotingEnabled
def ensureTypedActorEnabled = TypedActorModule.ensureTypedActorEnabled
- def ensureJtaEnabled = JtaModule.ensureJtaEnabled
def ensureEnterpriseEnabled = EnterpriseModule.ensureEnterpriseEnabled
/**
@@ -176,32 +173,6 @@ object ReflectiveAccess extends Logging {
}
}
- object JtaModule {
-
- type TransactionContainerObject = {
- def apply(): TransactionContainer
- }
-
- type TransactionContainer = {
- def beginWithStmSynchronization(transaction: Transaction): Unit
- def commit: Unit
- def rollback: Unit
- }
-
- lazy val isJtaEnabled = transactionContainerObjectInstance.isDefined
-
- def ensureJtaEnabled = if (!isJtaEnabled) throw new ModuleNotAvailableException(
- "Can't load the typed actor module, make sure that akka-jta.jar is on the classpath")
-
- val transactionContainerObjectInstance: Option[TransactionContainerObject] =
- getObjectFor("akka.jta.TransactionContainer$")
-
- def createTransactionContainer: TransactionContainer = {
- ensureJtaEnabled
- transactionContainerObjectInstance.get.apply.asInstanceOf[TransactionContainer]
- }
- }
-
object EnterpriseModule {
type Mailbox = {
diff --git a/akka-camel/src/main/scala/Consumer.scala b/akka-camel/src/main/scala/Consumer.scala
index 8720f34a39..a6323c3bae 100644
--- a/akka-camel/src/main/scala/Consumer.scala
+++ b/akka-camel/src/main/scala/Consumer.scala
@@ -55,7 +55,6 @@ trait Consumer { self: Actor =>
*
* @see UntypedConsumerActor
* @see RemoteUntypedConsumerActor
- * @see UntypedConsumerTransactor
*
* @author Martin Krasser
*/
@@ -82,12 +81,6 @@ trait UntypedConsumer extends Consumer { self: UntypedActor =>
*/
abstract class UntypedConsumerActor extends UntypedActor with UntypedConsumer
-/**
- * Subclass this abstract class to create an MDB-style transacted untyped consumer
- * actor. This class is meant to be used from Java.
- */
-abstract class UntypedConsumerTransactor extends UntypedTransactor with UntypedConsumer
-
/**
* Subclass this abstract class to create an MDB-style remote untyped consumer
* actor. This class is meant to be used from Java.
diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala
index c1f5cd56a1..e84a894ee3 100644
--- a/akka-camel/src/main/scala/component/ActorComponent.scala
+++ b/akka-camel/src/main/scala/component/ActorComponent.scala
@@ -16,7 +16,6 @@ import akka.actor._
import akka.camel.{Failure, Message}
import akka.camel.CamelMessageConversion.toExchangeAdapter
import akka.dispatch.{CompletableFuture, MessageInvocation, MessageDispatcher}
-import akka.stm.TransactionConfig
import scala.reflect.BeanProperty
@@ -274,9 +273,6 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
def actorClassName = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
- def transactionConfig_=(config: TransactionConfig): Unit = unsupported
- def transactionConfig: TransactionConfig = unsupported
- def makeTransactionRequired: Unit = unsupported
def makeRemote(hostname: String, port: Int): Unit = unsupported
def makeRemote(address: InetSocketAddress): Unit = unsupported
def homeAddress_=(address: InetSocketAddress): Unit = unsupported
diff --git a/akka-jta/src/test/scala/ReflectiveAccessSpec.scala b/akka-jta/src/test/scala/ReflectiveAccessSpec.scala
index 1b13ef67b5..417aec6eab 100644
--- a/akka-jta/src/test/scala/ReflectiveAccessSpec.scala
+++ b/akka-jta/src/test/scala/ReflectiveAccessSpec.scala
@@ -6,11 +6,11 @@ package akka.jta
import org.scalatest.junit.JUnitSuite
import org.junit.Test
-import akka.util.ReflectiveAccess
+import akka.stm.JtaModule
class ReflectiveAccessSpec extends JUnitSuite {
@Test def ensureReflectiveAccessCanLoadTransactionContainer {
- ReflectiveAccess.JtaModule.ensureJtaEnabled
- assert(ReflectiveAccess.JtaModule.transactionContainerObjectInstance.isDefined)
+ JtaModule.ensureJtaEnabled
+ assert(JtaModule.transactionContainerObjectInstance.isDefined)
}
}
diff --git a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala
index c877a06a1e..a425f0d7f5 100644
--- a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala
@@ -1,7 +1,8 @@
package akka.persistence.cassandra
-import akka.actor.{Actor, ActorRef, Transactor}
+import akka.actor.{Actor, ActorRef}
import Actor._
+import akka.stm.local._
import org.junit.Test
import org.junit.Assert._
@@ -17,22 +18,24 @@ case class SetMapState(key: String, value: String)
case class SetVectorState(key: String)
case class SetRefState(key: String)
case class Success(key: String, value: String)
-case class Failure(key: String, value: String, failer: ActorRef)
+case class Failure(key: String, value: String)
case class SetMapStateOneWay(key: String, value: String)
case class SetVectorStateOneWay(key: String)
case class SetRefStateOneWay(key: String)
case class SuccessOneWay(key: String, value: String)
-case class FailureOneWay(key: String, value: String, failer: ActorRef)
+case class FailureOneWay(key: String, value: String)
-class CassandraPersistentActor extends Transactor {
+class CassandraPersistentActor extends Actor {
self.timeout = 100000
- private lazy val mapState = CassandraStorage.newMap
- private lazy val vectorState = CassandraStorage.newVector
- private lazy val refState = CassandraStorage.newRef
+ private val mapState = CassandraStorage.newMap
+ private val vectorState = CassandraStorage.newVector
+ private val refState = CassandraStorage.newRef
- def receive = {
+ def receive = { case message => atomic { atomicReceive(message) } }
+
+ def atomicReceive: Receive = {
case GetMapState(key) =>
self.reply(mapState.get(key.getBytes("UTF-8")).get)
case GetVectorSize =>
@@ -53,26 +56,21 @@ class CassandraPersistentActor extends Transactor {
vectorState.add(msg.getBytes("UTF-8"))
refState.swap(msg.getBytes("UTF-8"))
self.reply(msg)
- case Failure(key, msg, failer) =>
+ case Failure(key, msg) =>
mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8"))
vectorState.add(msg.getBytes("UTF-8"))
refState.swap(msg.getBytes("UTF-8"))
- failer !! "Failure"
+ fail
self.reply(msg)
}
-}
-@serializable class PersistentFailerActor extends Transactor {
- def receive = {
- case "Failure" =>
- throw new RuntimeException("Expected exception; to test fault-tolerance")
- }
+ def fail = throw new RuntimeException("Expected exception; to test fault-tolerance")
}
class CassandraPersistentActorSpec extends JUnitSuite {
- //@Before
- //def startCassandra = EmbeddedCassandraService.start
+ // @Before
+ // def startCassandra = EmbeddedCassandraService.start
@Test
def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
@@ -89,10 +87,8 @@ class CassandraPersistentActorSpec extends JUnitSuite {
val stateful = actorOf[CassandraPersistentActor]
stateful.start
stateful !! SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
- val failer = actorOf[PersistentFailerActor]
- failer.start
try {
- stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state") // call failing transactionrequired method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
val result = (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).as[Array[Byte]].get
@@ -113,10 +109,8 @@ class CassandraPersistentActorSpec extends JUnitSuite {
val stateful = actorOf[CassandraPersistentActor]
stateful.start
stateful !! SetVectorState("init") // set init state
- val failer = actorOf[PersistentFailerActor]
- failer.start
try {
- stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state") // call failing transactionrequired method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals(1, (stateful !! GetVectorSize).get.asInstanceOf[java.lang.Integer].intValue)
@@ -137,10 +131,8 @@ class CassandraPersistentActorSpec extends JUnitSuite {
val stateful = actorOf[CassandraPersistentActor]
stateful.start
stateful !! SetRefState("init") // set init state
- val failer = actorOf[PersistentFailerActor]
- failer.start
try {
- stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state") // call failing transactionrequired method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
val result = (stateful !! GetRefState).as[Array[Byte]].get
@@ -148,9 +140,10 @@ class CassandraPersistentActorSpec extends JUnitSuite {
}
}
+
/*
-import org.apache.cassandra.service.CassandraDaemon
-object // EmbeddedCassandraService {
+object EmbeddedCassandraService {
+ import org.apache.cassandra.thrift.CassandraDaemon
System.setProperty("storage-config", "src/test/resources");
diff --git a/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBPersistentActorSpec.scala b/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBPersistentActorSpec.scala
index 1e8da99da5..7d418375af 100644
--- a/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBPersistentActorSpec.scala
@@ -6,18 +6,19 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
-import akka.actor.{Transactor, Actor, ActorRef}
+import akka.actor.{Actor, ActorRef}
import Actor._
+import akka.stm.local
case class Balance(accountNo: String)
-case class Debit(accountNo: String, amount: Int, failer: ActorRef)
-case class MultiDebit(accountNo: String, amounts: List[Int], failer: ActorRef)
+case class Debit(accountNo: String, amount: Int)
+case class MultiDebit(accountNo: String, amounts: List[Int])
case class Credit(accountNo: String, amount: Int)
case class Log(start: Int, finish: Int)
case object LogSize
-class BankAccountActor extends Transactor {
+class BankAccountActor extends Actor {
private val accountState = CouchDBStorage.newMap
private val txnLog = CouchDBStorage.newVector
@@ -25,7 +26,9 @@ class BankAccountActor extends Transactor {
import sjson.json.DefaultProtocol._
import sjson.json.JsonSerialization._
- def receive: Receive = {
+ def receive = { case message => local.atomic { atomicReceive(message) } }
+
+ def atomicReceive: Receive = {
// check balance
case Balance(accountNo) =>
txnLog.add(("Balance:" + accountNo).getBytes)
@@ -35,20 +38,20 @@ class BankAccountActor extends Transactor {
.getOrElse(0))
// debit amount: can fail
- case Debit(accountNo, amount, failer) =>
+ case Debit(accountNo, amount) =>
txnLog.add(("Debit:" + accountNo + " " + amount).getBytes)
val m = accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0)
accountState.put(accountNo.getBytes, tobinary(m - amount))
- if (amount > m) failer !! "Failure"
+ if (amount > m) fail
self.reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
- case MultiDebit(accountNo, amounts, failer) =>
+ case MultiDebit(accountNo, amounts) =>
val sum = amounts.foldRight(0)(_ + _)
txnLog.add(("MultiDebit:" + accountNo + " " + sum).getBytes)
@@ -61,7 +64,7 @@ class BankAccountActor extends Transactor {
amount =>
accountState.put(accountNo.getBytes, tobinary(m - amount))
cbal = cbal - amount
- if (cbal < 0) failer !! "Failure"
+ if (cbal < 0) fail
}
self.reply(m - sum)
@@ -83,13 +86,8 @@ class BankAccountActor extends Transactor {
case Log(start, finish) =>
self.reply(txnLog.slice(start, finish).map(new String(_)))
}
-}
-@serializable class PersistentFailerActor extends Transactor {
- def receive = {
- case "Failure" =>
- throw new RuntimeException("Expected exception; to test fault-tolerance")
- }
+ def fail = throw new RuntimeException("Expected exception; to test fault-tolerance")
}
@RunWith(classOf[JUnitRunner])
@@ -108,25 +106,23 @@ class CouchDBPersistentActor extends
describe("successful debit") {
it("should debit successfully") {
- log.info("Succesful Debit starting")
+ Actor.log.info("Succesful Debit starting")
val bactor = actorOf[BankAccountActor]
bactor.start
- val failer = actorOf[PersistentFailerActor]
- failer.start
bactor !! Credit("a-123", 5000)
- log.info("credited")
- bactor !! Debit("a-123", 3000, failer)
- log.info("debited")
+ Actor.log.info("credited")
+ bactor !! Debit("a-123", 3000)
+ Actor.log.info("debited")
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(2000)
- log.info("balane matched")
+ Actor.log.info("balane matched")
bactor !! Credit("a-123", 7000)
- log.info("Credited")
+ Actor.log.info("Credited")
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(9000)
- log.info("Balance matched")
- bactor !! Debit("a-123", 8000, failer)
- log.info("Debited")
+ Actor.log.info("Balance matched")
+ bactor !! Debit("a-123", 8000)
+ Actor.log.info("Debited")
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(1000)
- log.info("Balance matched")
+ Actor.log.info("Balance matched")
(bactor !! LogSize).get.asInstanceOf[Int] should equal(7)
(bactor !! Log(0, 7)).get.asInstanceOf[Iterable[String]].size should equal(7)
}
@@ -136,12 +132,10 @@ class CouchDBPersistentActor extends
it("debit should fail") {
val bactor = actorOf[BankAccountActor]
bactor.start
- val failer = actorOf[PersistentFailerActor]
- failer.start
bactor !! Credit("a-123", 5000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
evaluating {
- bactor !! Debit("a-123", 7000, failer)
+ bactor !! Debit("a-123", 7000)
} should produce [Exception]
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
@@ -152,12 +146,10 @@ class CouchDBPersistentActor extends
it("multidebit should fail") {
val bactor = actorOf[BankAccountActor]
bactor.start
- val failer = actorOf[PersistentFailerActor]
- failer.start
bactor !! Credit("a-123", 5000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
evaluating {
- bactor !! MultiDebit("a-123", List(1000, 2000, 4000), failer)
+ bactor !! MultiDebit("a-123", List(1000, 2000, 4000))
} should produce [Exception]
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpecTestIntegration.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpecTestIntegration.scala
index 8992ce71d7..2a8ef2c8e3 100644
--- a/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpecTestIntegration.scala
+++ b/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpecTestIntegration.scala
@@ -1,7 +1,8 @@
package akka.persistence.hbase
-import akka.actor.{ Actor, ActorRef, Transactor }
+import akka.actor.{ Actor, ActorRef }
import Actor._
+import akka.stm.local._
import org.junit.Test
import org.junit.Assert._
@@ -23,22 +24,24 @@ case class SetMapState(key: String, value: String)
case class SetVectorState(key: String)
case class SetRefState(key: String)
case class Success(key: String, value: String)
-case class Failure(key: String, value: String, failer: ActorRef)
+case class Failure(key: String, value: String)
case class SetMapStateOneWay(key: String, value: String)
case class SetVectorStateOneWay(key: String)
case class SetRefStateOneWay(key: String)
case class SuccessOneWay(key: String, value: String)
-case class FailureOneWay(key: String, value: String, failer: ActorRef)
+case class FailureOneWay(key: String, value: String)
-class HbasePersistentActor extends Transactor {
+class HbasePersistentActor extends Actor {
self.timeout = 100000
- private lazy val mapState = HbaseStorage.newMap
- private lazy val vectorState = HbaseStorage.newVector
- private lazy val refState = HbaseStorage.newRef
+ private val mapState = HbaseStorage.newMap
+ private val vectorState = HbaseStorage.newVector
+ private val refState = HbaseStorage.newRef
- def receive = {
+ def receive = { case message => atomic { atomicReceive(message) } }
+
+ def atomicReceive: Receive = {
case GetMapState(key) =>
self.reply(mapState.get(key.getBytes("UTF-8")).get)
case GetVectorSize =>
@@ -59,21 +62,15 @@ class HbasePersistentActor extends Transactor {
vectorState.add(msg.getBytes("UTF-8"))
refState.swap(msg.getBytes("UTF-8"))
self.reply(msg)
- case Failure(key, msg, failer) =>
+ case Failure(key, msg) =>
mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8"))
vectorState.add(msg.getBytes("UTF-8"))
refState.swap(msg.getBytes("UTF-8"))
- failer !! "Failure"
+ fail
self.reply(msg)
}
-}
-@serializable
-class PersistentFailerActor extends Transactor {
- def receive = {
- case "Failure" =>
- throw new RuntimeException("Expected exception; to test fault-tolerance")
- }
+ def fail = throw new RuntimeException("Expected exception; to test fault-tolerance")
}
class HbasePersistentActorSpecTestIntegration extends JUnitSuite with BeforeAndAfterAll {
@@ -113,10 +110,8 @@ class HbasePersistentActorSpecTestIntegration extends JUnitSuite with BeforeAndA
val stateful = actorOf[HbasePersistentActor]
stateful.start
stateful !! SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
- val failer = actorOf[PersistentFailerActor]
- failer.start
try {
- stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state") // call failing transactionrequired method
fail("should have thrown an exception")
} catch { case e: RuntimeException => {} }
val result = (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).as[Array[Byte]].get
@@ -137,10 +132,8 @@ class HbasePersistentActorSpecTestIntegration extends JUnitSuite with BeforeAndA
val stateful = actorOf[HbasePersistentActor]
stateful.start
stateful !! SetVectorState("init") // set init state
- val failer = actorOf[PersistentFailerActor]
- failer.start
try {
- stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state") // call failing transactionrequired method
fail("should have thrown an exception")
} catch { case e: RuntimeException => {} }
assertEquals(1, (stateful !! GetVectorSize).get.asInstanceOf[java.lang.Integer].intValue)
@@ -161,10 +154,8 @@ class HbasePersistentActorSpecTestIntegration extends JUnitSuite with BeforeAndA
val stateful = actorOf[HbasePersistentActor]
stateful.start
stateful !! SetRefState("init") // set init state
- val failer = actorOf[PersistentFailerActor]
- failer.start
try {
- stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state") // call failing transactionrequired method
fail("should have thrown an exception")
} catch { case e: RuntimeException => {} }
val result = (stateful !! GetRefState).as[Array[Byte]].get
diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala
index 8708d1b45e..597a847e1b 100644
--- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala
@@ -6,26 +6,29 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
-import akka.actor.{Transactor, Actor, ActorRef}
+import akka.actor.{Actor, ActorRef}
import Actor._
+import akka.stm.local._
case class Balance(accountNo: String)
-case class Debit(accountNo: String, amount: Int, failer: ActorRef)
-case class MultiDebit(accountNo: String, amounts: List[Int], failer: ActorRef)
+case class Debit(accountNo: String, amount: Int)
+case class MultiDebit(accountNo: String, amounts: List[Int])
case class Credit(accountNo: String, amount: Int)
case class Log(start: Int, finish: Int)
case object LogSize
-class BankAccountActor extends Transactor {
+class BankAccountActor extends Actor {
- private lazy val accountState = MongoStorage.newMap
- private lazy val txnLog = MongoStorage.newVector
+ private val accountState = MongoStorage.newMap
+ private val txnLog = MongoStorage.newVector
import sjson.json.DefaultProtocol._
import sjson.json.JsonSerialization._
- def receive: Receive = {
+ def receive = { case message => atomic { atomicReceive(message) } }
+
+ def atomicReceive: Receive = {
// check balance
case Balance(accountNo) =>
txnLog.add(("Balance:" + accountNo).getBytes)
@@ -35,20 +38,20 @@ class BankAccountActor extends Transactor {
.getOrElse(0))
// debit amount: can fail
- case Debit(accountNo, amount, failer) =>
+ case Debit(accountNo, amount) =>
txnLog.add(("Debit:" + accountNo + " " + amount).getBytes)
val m = accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0)
accountState.put(accountNo.getBytes, tobinary(m - amount))
- if (amount > m) failer !! "Failure"
+ if (amount > m) fail
self.reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
- case MultiDebit(accountNo, amounts, failer) =>
+ case MultiDebit(accountNo, amounts) =>
val sum = amounts.foldRight(0)(_ + _)
txnLog.add(("MultiDebit:" + accountNo + " " + sum).getBytes)
@@ -60,7 +63,7 @@ class BankAccountActor extends Transactor {
amounts.foreach { amount =>
accountState.put(accountNo.getBytes, tobinary(m - amount))
cbal = cbal - amount
- if (cbal < 0) failer !! "Failure"
+ if (cbal < 0) fail
}
self.reply(m - sum)
@@ -82,13 +85,8 @@ class BankAccountActor extends Transactor {
case Log(start, finish) =>
self.reply(txnLog.slice(start, finish).map(new String(_)))
}
-}
-@serializable class PersistentFailerActor extends Transactor {
- def receive = {
- case "Failure" =>
- throw new RuntimeException("Expected exception; to test fault-tolerance")
- }
+ def fail = throw new RuntimeException("Expected exception; to test fault-tolerance")
}
@RunWith(classOf[JUnitRunner])
@@ -109,17 +107,15 @@ class MongoPersistentActorSpec extends
it("should debit successfully") {
val bactor = actorOf[BankAccountActor]
bactor.start
- val failer = actorOf[PersistentFailerActor]
- failer.start
bactor !! Credit("a-123", 5000)
- bactor !! Debit("a-123", 3000, failer)
+ bactor !! Debit("a-123", 3000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(2000)
bactor !! Credit("a-123", 7000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(9000)
- bactor !! Debit("a-123", 8000, failer)
+ bactor !! Debit("a-123", 8000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(1000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(7)
@@ -131,12 +127,10 @@ class MongoPersistentActorSpec extends
it("debit should fail") {
val bactor = actorOf[BankAccountActor]
bactor.start
- val failer = actorOf[PersistentFailerActor]
- failer.start
bactor !! Credit("a-123", 5000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
evaluating {
- bactor !! Debit("a-123", 7000, failer)
+ bactor !! Debit("a-123", 7000)
} should produce [Exception]
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
@@ -147,12 +141,10 @@ class MongoPersistentActorSpec extends
it("multidebit should fail") {
val bactor = actorOf[BankAccountActor]
bactor.start
- val failer = actorOf[PersistentFailerActor]
- failer.start
bactor !! Credit("a-123", 5000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
evaluating {
- bactor !! MultiDebit("a-123", List(1000, 2000, 4000), failer)
+ bactor !! MultiDebit("a-123", List(1000, 2000, 4000))
} should produce [Exception]
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
index c487741ea6..a28a5485ed 100644
--- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
@@ -5,8 +5,9 @@ import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before}
import org.junit.Assert._
-import akka.actor.{ActorRef, Transactor}
+import akka.actor.{Actor, ActorRef}
import akka.actor.Actor._
+import akka.stm.local._
/**
* A persistent actor based on Redis storage.
@@ -22,44 +23,47 @@ import akka.actor.Actor._
*/
case class Balance(accountNo: String)
-case class Debit(accountNo: String, amount: Int, failer: ActorRef)
-case class MultiDebit(accountNo: String, amounts: List[Int], failer: ActorRef)
+case class Debit(accountNo: String, amount: Int)
+case class MultiDebit(accountNo: String, amounts: List[Int])
case class Credit(accountNo: String, amount: Int)
case object LogSize
-class AccountActor extends Transactor {
- private lazy val accountState = RedisStorage.newMap
+class AccountActor extends Actor {
+ private val accountState = RedisStorage.newMap
private val txnLog = RedisStorage.newVector
self.timeout = 100000
- def receive = {
+ def receive = { case message => atomic { atomicReceive(message) } }
+
+ def atomicReceive: Receive = {
// check balance
case Balance(accountNo) =>
txnLog.add("Balance:%s".format(accountNo).getBytes)
self.reply(new String(accountState.get(accountNo.getBytes).get).toInt)
// debit amount: can fail
- case Debit(accountNo, amount, failer) =>
+ case Debit(accountNo, amount) =>
txnLog.add("Debit:%s %s".format(accountNo, amount.toString).getBytes)
- val Some(m) = accountState.get(accountNo.getBytes).map(x => (new String(x)).toInt) orElse Some(0)
+ val Some(m) = accountState.get(accountNo.getBytes).map(x => (new String(x)).toInt) orElse Some(0)
accountState.put(accountNo.getBytes, (m - amount).toString.getBytes)
- if (amount > m)
- failer !! "Failure"
+ if (amount > m) fail
+
self.reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
- case MultiDebit(accountNo, amounts, failer) =>
+ case MultiDebit(accountNo, amounts) =>
txnLog.add("MultiDebit:%s %s".format(accountNo, amounts.map(_.intValue).foldLeft(0)(_ + _).toString).getBytes)
- val Some(m) = accountState.get(accountNo.getBytes).map(x => (new String(x)).toInt) orElse Some(0)
+ val Some(m) = accountState.get(accountNo.getBytes).map(x => (new String(x)).toInt) orElse Some(0)
var bal = 0
amounts.foreach {amount =>
bal = bal + amount
accountState.put(accountNo.getBytes, (m - bal).toString.getBytes)
}
- if (bal > m) failer !! "Failure"
+ if (bal > m) fail
+
self.reply(m - bal)
// credit amount
@@ -73,14 +77,8 @@ class AccountActor extends Transactor {
case LogSize =>
self.reply(txnLog.length.asInstanceOf[AnyRef])
}
-}
-@serializable class PersistentFailerActor extends Transactor {
- self.timeout = 5000
- def receive = {
- case "Failure" =>
- throw new RuntimeException("Expected exception; to test fault-tolerance")
- }
+ def fail = throw new RuntimeException("Expected exception; to test fault-tolerance")
}
class RedisPersistentActorSpec extends JUnitSuite {
@@ -88,16 +86,14 @@ class RedisPersistentActorSpec extends JUnitSuite {
def testSuccessfulDebit = {
val bactor = actorOf(new AccountActor)
bactor.start
- val failer = actorOf(new PersistentFailerActor)
- failer.start
bactor !! Credit("a-123", 5000)
- bactor !! Debit("a-123", 3000, failer)
+ bactor !! Debit("a-123", 3000)
assertEquals(2000, (bactor !! Balance("a-123")).get)
bactor !! Credit("a-123", 7000)
assertEquals(9000, (bactor !! Balance("a-123")).get)
- bactor !! Debit("a-123", 8000, failer)
+ bactor !! Debit("a-123", 8000)
assertEquals(1000, (bactor !! Balance("a-123")).get)
val c = (bactor !! LogSize).as[Int].get
@@ -111,10 +107,8 @@ class RedisPersistentActorSpec extends JUnitSuite {
bactor !! Credit("a-123", 5000)
assertEquals(5000, (bactor !! Balance("a-123")).get)
- val failer = actorOf[PersistentFailerActor]
- failer.start
try {
- bactor !! Debit("a-123", 7000, failer)
+ bactor !! Debit("a-123", 7000)
fail("should throw exception")
} catch { case e: RuntimeException => {}}
@@ -133,10 +127,8 @@ class RedisPersistentActorSpec extends JUnitSuite {
assertEquals(5000, (bactor !! (Balance("a-123"), 5000)).get)
- val failer = actorOf[PersistentFailerActor]
- failer.start
try {
- bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)
+ bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000))
fail("should throw exception")
} catch { case e: RuntimeException => {}}
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala
index f21b29064f..e8f127f218 100644
--- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala
@@ -3,8 +3,9 @@ package akka.persistence.redis
import org.junit.{Test, Before}
import org.junit.Assert._
-import akka.actor.{Actor, ActorRef, Transactor}
+import akka.actor.{Actor, ActorRef}
import Actor._
+import akka.stm.local._
/**
* A persistent actor based on Redis queue storage.
@@ -15,13 +16,15 @@ import Actor._
case class NQ(accountNo: String)
case object DQ
-case class MNDQ(accountNos: List[String], noOfDQs: Int, failer: ActorRef)
+case class MNDQ(accountNos: List[String], noOfDQs: Int)
case object SZ
-class QueueActor extends Transactor {
- private lazy val accounts = RedisStorage.newQueue
+class QueueActor extends Actor {
+ private val accounts = RedisStorage.newQueue
- def receive = {
+ def receive = { case message => atomic { atomicReceive(message) } }
+
+ def atomicReceive: Receive = {
// enqueue
case NQ(accountNo) =>
accounts.enqueue(accountNo.getBytes)
@@ -33,13 +36,12 @@ class QueueActor extends Transactor {
self.reply(d)
// multiple NQ and DQ
- case MNDQ(enqs, no, failer) =>
+ case MNDQ(enqs, no) =>
accounts.enqueue(enqs.map(_.getBytes): _*)
try {
(1 to no).foreach(e => accounts.dequeue)
} catch {
- case e: Exception =>
- failer !! "Failure"
+ case e: Exception => fail
}
self.reply(true)
@@ -47,6 +49,8 @@ class QueueActor extends Transactor {
case SZ =>
self.reply(accounts.size)
}
+
+ def fail = throw new RuntimeException("Expected exception; to test fault-tolerance")
}
import org.scalatest.junit.JUnitSuite
@@ -82,8 +86,6 @@ class RedisPersistentQSpec extends JUnitSuite {
def testSuccessfulMNDQ = {
val qa = actorOf[QueueActor]
qa.start
- val failer = actorOf[PersistentFailerActor]
- failer.start
qa !! NQ("a-123")
qa !! NQ("a-124")
@@ -93,7 +95,7 @@ class RedisPersistentQSpec extends JUnitSuite {
assertEquals("a-123", (qa !! DQ).get)
val s = (qa !! SZ).as[Int].get
assertTrue(2 == s)
- qa !! MNDQ(List("a-126", "a-127"), 2, failer)
+ qa !! MNDQ(List("a-126", "a-127"), 2)
val u = (qa !! SZ).as[Int].get
assertTrue(2 == u)
}
@@ -102,8 +104,6 @@ class RedisPersistentQSpec extends JUnitSuite {
def testMixedMNDQ = {
val qa = actorOf[QueueActor]
qa.start
- val failer = actorOf[PersistentFailerActor]
- failer.start
// 3 enqueues
qa !! NQ("a-123")
@@ -121,7 +121,7 @@ class RedisPersistentQSpec extends JUnitSuite {
assertTrue(2 == s)
// enqueue 2, dequeue 2 => size == 2
- qa !! MNDQ(List("a-126", "a-127"), 2, failer)
+ qa !! MNDQ(List("a-126", "a-127"), 2)
val u = (qa !! SZ).as[Int].get
assertTrue(2 == u)
@@ -135,7 +135,7 @@ class RedisPersistentQSpec extends JUnitSuite {
// dequeue 6 => fail transaction
// size should remain 4
try {
- qa !! MNDQ(List("a-130"), 6, failer)
+ qa !! MNDQ(List("a-130"), 6)
} catch { case e: Exception => {} }
val w = (qa !! SZ).as[Int].get
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala
index 6ad57da8bd..d8a061a64d 100644
--- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala
@@ -7,8 +7,9 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
-import akka.actor.{Actor, ActorRef, Transactor}
+import akka.actor.{Actor, ActorRef}
import Actor._
+import akka.stm.local._
/**
* A persistent actor based on Redis sortedset storage.
@@ -43,15 +44,17 @@ case class SCORE(h: Hacker)
case class RANGE(start: Int, end: Int)
// add and remove subject to the condition that there will be at least 3 hackers
-case class MULTI(add: List[Hacker], rem: List[Hacker], failer: ActorRef)
+case class MULTI(add: List[Hacker], rem: List[Hacker])
case class MULTIRANGE(add: List[Hacker])
-class SortedSetActor extends Transactor {
+class SortedSetActor extends Actor {
self.timeout = 100000
- private lazy val hackers = RedisStorage.newSortedSet
+ private val hackers = RedisStorage.newSortedSet
- def receive = {
+ def receive = { case message => atomic { atomicReceive(message) } }
+
+ def atomicReceive: Receive = {
case ADD(h) =>
hackers.+(h.name.getBytes, h.zscore)
self.reply(true)
@@ -69,7 +72,7 @@ class SortedSetActor extends Transactor {
case RANGE(s, e) =>
self.reply(hackers.zrange(s, e))
- case MULTI(a, r, failer) =>
+ case MULTI(a, r) =>
a.foreach{ h: Hacker =>
hackers.+(h.name.getBytes, h.zscore)
}
@@ -80,8 +83,7 @@ class SortedSetActor extends Transactor {
hackers.-(h.name.getBytes)
}
} catch {
- case e: Exception =>
- failer !! "Failure"
+ case e: Exception => fail
}
self.reply((a.size, r.size))
@@ -91,6 +93,8 @@ class SortedSetActor extends Transactor {
}
self.reply(hackers.zrange(0, -1))
}
+
+ def fail = throw new RuntimeException("Expected exception; to test fault-tolerance")
}
import RedisStorageBackend._
@@ -178,13 +182,10 @@ class RedisPersistentSortedSetSpec extends
val qa = actorOf[SortedSetActor]
qa.start
- val failer = actorOf[PersistentFailerActor]
- failer.start
-
(qa !! SIZE).get.asInstanceOf[Int] should equal(0)
val add = List(h1, h2, h3, h4)
val rem = List(h2)
- (qa !! MULTI(add, rem, failer)).get.asInstanceOf[Tuple2[Int, Int]] should equal((4,1))
+ (qa !! MULTI(add, rem)).get.asInstanceOf[Tuple2[Int, Int]] should equal((4,1))
(qa !! SIZE).get.asInstanceOf[Int] should equal(3)
// size == 3
@@ -194,7 +195,7 @@ class RedisPersistentSortedSetSpec extends
// remove 3
val rem1 = List(h1, h3, h4, h5)
try {
- qa !! MULTI(add1, rem1, failer)
+ qa !! MULTI(add1, rem1)
} catch { case e: RuntimeException => {} }
(qa !! SIZE).get.asInstanceOf[Int] should equal(3)
}
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala
index 7a14503335..b0b0bdb2b3 100644
--- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala
@@ -6,14 +6,15 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
-import akka.actor.{Transactor, Actor, ActorRef}
+import akka.actor.{Actor, ActorRef}
import Actor._
import BankAccountActor._
+import akka.stm.local._
case class Balance(accountNo: String)
-case class Debit(accountNo: String, amount: Int, failer: ActorRef)
-case class MultiDebit(accountNo: String, amounts: List[Int], failer: ActorRef)
+case class Debit(accountNo: String, amount: Int)
+case class MultiDebit(accountNo: String, amounts: List[Int])
case class Credit(accountNo: String, amount: Int)
case class Log(start: Int, finish: Int)
case object LogSize
@@ -23,14 +24,16 @@ object BankAccountActor {
val tx = "txnLog"
}
-class BankAccountActor extends Transactor {
+class BankAccountActor extends Actor {
private val accountState = VoldemortStorage.newMap(state)
private val txnLog = VoldemortStorage.newVector(tx)
import sjson.json.DefaultProtocol._
import sjson.json.JsonSerialization._
- def receive: Receive = {
+ def receive = { case message => atomic { atomicReceive(message) } }
+
+ def atomicReceive: Receive = {
// check balance
case Balance(accountNo) =>
txnLog.add(("Balance:" + accountNo).getBytes)
@@ -40,20 +43,20 @@ class BankAccountActor extends Transactor {
.getOrElse(0))
// debit amount: can fail
- case Debit(accountNo, amount, failer) =>
+ case Debit(accountNo, amount) =>
txnLog.add(("Debit:" + accountNo + " " + amount).getBytes)
val m = accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0)
accountState.put(accountNo.getBytes, tobinary(m - amount))
- if (amount > m) failer !! "Failure"
+ if (amount > m) fail
self.reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
- case MultiDebit(accountNo, amounts, failer) =>
+ case MultiDebit(accountNo, amounts) =>
val sum = amounts.foldRight(0)(_ + _)
txnLog.add(("MultiDebit:" + accountNo + " " + sum).getBytes)
@@ -66,7 +69,7 @@ class BankAccountActor extends Transactor {
amount =>
accountState.put(accountNo.getBytes, tobinary(m - amount))
cbal = cbal - amount
- if (cbal < 0) failer !! "Failure"
+ if (cbal < 0) fail
}
self.reply(m - sum)
@@ -88,13 +91,8 @@ class BankAccountActor extends Transactor {
case Log(start, finish) =>
self.reply(txnLog.slice(start, finish).map(new String(_)))
}
-}
-@serializable class PersistentFailerActor extends Transactor {
- def receive = {
- case "Failure" =>
- throw new RuntimeException("Expected exception; to test fault-tolerance")
- }
+ def fail = throw new RuntimeException("Expected exception; to test fault-tolerance")
}
@RunWith(classOf[JUnitRunner])
@@ -119,11 +117,9 @@ Spec with
log.info("Succesful Debit starting")
val bactor = actorOf[BankAccountActor]
bactor.start
- val failer = actorOf[PersistentFailerActor]
- failer.start
bactor !! Credit("a-123", 5000)
log.info("credited")
- bactor !! Debit("a-123", 3000, failer)
+ bactor !! Debit("a-123", 3000)
log.info("debited")
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(2000)
log.info("balane matched")
@@ -131,7 +127,7 @@ Spec with
log.info("Credited")
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(9000)
log.info("Balance matched")
- bactor !! Debit("a-123", 8000, failer)
+ bactor !! Debit("a-123", 8000)
log.info("Debited")
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(1000)
log.info("Balance matched")
@@ -144,12 +140,10 @@ Spec with
it("debit should fail") {
val bactor = actorOf[BankAccountActor]
bactor.start
- val failer = actorOf[PersistentFailerActor]
- failer.start
bactor !! Credit("a-123", 5000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
evaluating {
- bactor !! Debit("a-123", 7000, failer)
+ bactor !! Debit("a-123", 7000)
} should produce[Exception]
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
@@ -160,12 +154,10 @@ Spec with
it("multidebit should fail") {
val bactor = actorOf[BankAccountActor]
bactor.start
- val failer = actorOf[PersistentFailerActor]
- failer.start
bactor !! Credit("a-123", 5000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
evaluating {
- bactor !! MultiDebit("a-123", List(1000, 2000, 4000), failer)
+ bactor !! MultiDebit("a-123", List(1000, 2000, 4000))
} should produce[Exception]
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
diff --git a/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java
index e4da0ea515..90e141546c 100644
--- a/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java
+++ b/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java
@@ -1076,50 +1076,43 @@ public final class RemoteProtocol {
public boolean hasSerializerClassname() { return hasSerializerClassname; }
public java.lang.String getSerializerClassname() { return serializerClassname_; }
- // optional bool isTransactor = 7;
- public static final int ISTRANSACTOR_FIELD_NUMBER = 7;
- private boolean hasIsTransactor;
- private boolean isTransactor_ = false;
- public boolean hasIsTransactor() { return hasIsTransactor; }
- public boolean getIsTransactor() { return isTransactor_; }
-
- // optional uint64 timeout = 8;
- public static final int TIMEOUT_FIELD_NUMBER = 8;
+ // optional uint64 timeout = 7;
+ public static final int TIMEOUT_FIELD_NUMBER = 7;
private boolean hasTimeout;
private long timeout_ = 0L;
public boolean hasTimeout() { return hasTimeout; }
public long getTimeout() { return timeout_; }
- // optional uint64 receiveTimeout = 9;
- public static final int RECEIVETIMEOUT_FIELD_NUMBER = 9;
+ // optional uint64 receiveTimeout = 8;
+ public static final int RECEIVETIMEOUT_FIELD_NUMBER = 8;
private boolean hasReceiveTimeout;
private long receiveTimeout_ = 0L;
public boolean hasReceiveTimeout() { return hasReceiveTimeout; }
public long getReceiveTimeout() { return receiveTimeout_; }
- // optional .LifeCycleProtocol lifeCycle = 10;
- public static final int LIFECYCLE_FIELD_NUMBER = 10;
+ // optional .LifeCycleProtocol lifeCycle = 9;
+ public static final int LIFECYCLE_FIELD_NUMBER = 9;
private boolean hasLifeCycle;
private akka.remote.protocol.RemoteProtocol.LifeCycleProtocol lifeCycle_;
public boolean hasLifeCycle() { return hasLifeCycle; }
public akka.remote.protocol.RemoteProtocol.LifeCycleProtocol getLifeCycle() { return lifeCycle_; }
- // optional .RemoteActorRefProtocol supervisor = 11;
- public static final int SUPERVISOR_FIELD_NUMBER = 11;
+ // optional .RemoteActorRefProtocol supervisor = 10;
+ public static final int SUPERVISOR_FIELD_NUMBER = 10;
private boolean hasSupervisor;
private akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol supervisor_;
public boolean hasSupervisor() { return hasSupervisor; }
public akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol getSupervisor() { return supervisor_; }
- // optional bytes hotswapStack = 12;
- public static final int HOTSWAPSTACK_FIELD_NUMBER = 12;
+ // optional bytes hotswapStack = 11;
+ public static final int HOTSWAPSTACK_FIELD_NUMBER = 11;
private boolean hasHotswapStack;
private com.google.protobuf.ByteString hotswapStack_ = com.google.protobuf.ByteString.EMPTY;
public boolean hasHotswapStack() { return hasHotswapStack; }
public com.google.protobuf.ByteString getHotswapStack() { return hotswapStack_; }
- // repeated .RemoteMessageProtocol messages = 13;
- public static final int MESSAGES_FIELD_NUMBER = 13;
+ // repeated .RemoteMessageProtocol messages = 12;
+ public static final int MESSAGES_FIELD_NUMBER = 12;
private java.util.List