diff --git a/akka-actors/pom.xml b/akka-actors/pom.xml index 3b8564235a..260e357b45 100644 --- a/akka-actors/pom.xml +++ b/akka-actors/pom.xml @@ -146,24 +146,4 @@ test - - - - - false - ../config - - akka.conf - akka-reference.conf - - - - false - src/main/resources - - META-INF/* - - - - diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index ae474b5df2..f89edefc8a 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -4,21 +4,22 @@ package se.scalablesolutions.akka.actor -import com.google.protobuf.ByteString - import java.net.InetSocketAddress import java.util.HashSet -import reactor._ -import config.ScalaConfig._ -import stm.TransactionManagement -import nio.protobuf.RemoteProtocol.RemoteRequest -import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory} -import serialization.{Serializer, Serializable, SerializationProtocol} -import util.Helpers.ReadWriteLock -import util.Logging +import se.scalablesolutions.akka.Config._ +import se.scalablesolutions.akka.reactor._ +import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.stm.TransactionManagement._ +import se.scalablesolutions.akka.stm.TransactionManagement +import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest +import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} +import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.util.Helpers.ReadWriteLock +import se.scalablesolutions.akka.util.Logging import org.multiverse.utils.TransactionThreadLocal._ +import org.multiverse.api.exceptions.StmException sealed abstract class LifecycleMessage case class Init(config: AnyRef) extends LifecycleMessage @@ -46,7 +47,6 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker { * @author Jonas Bonér */ object Actor { - import Config._ val TIMEOUT = config.getInt("akka.actor.timeout", 5000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) } @@ -68,7 +68,7 @@ trait Actor extends Logging with TransactionManagement { @volatile protected[this] var remoteAddress: Option[InetSocketAddress] = None @volatile protected[akka] var supervisor: Option[Actor] = None - protected[Actor] var mailbox: MessageQueue = _ + protected[akka] var mailbox: MessageQueue = _ protected[this] var senderFuture: Option[CompletableFutureResult] = None protected[this] val linkedActors = new HashSet[Actor] protected[actor] var lifeCycleConfig: Option[LifeCycle] = None @@ -443,15 +443,14 @@ trait Actor extends Logging with TransactionManagement { RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build) } else { - val handle = new MessageInvocation(this, message, None, TransactionManagement.threadBoundTx.get) - mailbox.append(handle) - latestMessage = Some(handle) + val handle = new MessageInvocation(this, message, None, currentTransaction.get) + handle.send } } private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime if (remoteAddress.isDefined) { - val requestBuilder = RemoteRequest.newBuilder + val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) .setTarget(this.getClass.getName) .setTimeout(timeout) @@ -466,9 +465,8 @@ trait Actor extends Logging with TransactionManagement { else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } else { val future = new DefaultCompletableFutureResult(timeout) - val handle = new MessageInvocation(this, message, Some(future), TransactionManagement.threadBoundTx.get) - mailbox.append(handle) - latestMessage = Some(handle) + val handle = new MessageInvocation(this, message, Some(future), currentTransaction.get) + handle.send future } } @@ -483,7 +481,7 @@ trait Actor extends Logging with TransactionManagement { private def dispatch[T](messageHandle: MessageInvocation) = { if (messageHandle.tx.isDefined) { - TransactionManagement.threadBoundTx.set(messageHandle.tx) + currentTransaction.set(messageHandle.tx) setThreadLocalTransaction(messageHandle.tx.get.transaction) } val message = messageHandle.message //serializeMessage(messageHandle.message) @@ -499,14 +497,14 @@ trait Actor extends Logging with TransactionManagement { if (future.isDefined) future.get.completeWithException(this, e) else e.printStackTrace } finally { - TransactionManagement.threadBoundTx.set(None) + currentTransaction.set(None) setThreadLocalTransaction(null) } } private def transactionalDispatch[T](messageHandle: MessageInvocation) = { if (messageHandle.tx.isDefined) { - TransactionManagement.threadBoundTx.set(messageHandle.tx) + currentTransaction.set(messageHandle.tx) setThreadLocalTransaction(messageHandle.tx.get.transaction) } @@ -514,35 +512,59 @@ trait Actor extends Logging with TransactionManagement { val future = messageHandle.future try { - if (!tryToCommitTransaction && isTransactionTopLevel) handleCollision + tryToCommitTransactions - if (TransactionManagement.threadBoundTx.get.isDefined && !TransactionManagement.threadBoundTx.get.get.isActive) { - TransactionManagement.threadBoundTx.set(None) // need to clear threadBoundTx before call to supervisor + if (currentTransaction.get.isDefined && !currentTransaction.get.get.isActive) { + currentTransaction.set(None) // need to clear currentTransaction before call to supervisor setThreadLocalTransaction(null) } if (isInExistingTransaction) joinExistingTransaction - else if (isTransactional) startNewTransaction + else if (isTransactional) startNewTransaction(messageHandle) incrementTransaction senderFuture = future - if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function - else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString) + if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function + else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString) + decrementTransaction } catch { - case e => - rollback(activeTx) - TransactionManagement.threadBoundTx.set(None) // need to clear threadBoundTx before call to supervisor + case e: StmException => + e.printStackTrace + decrementTransaction + + val tx = currentTransaction.get + rollback(tx) + + if (!(tx.isDefined && tx.get.isTopLevel)) { + val done = tx.get.retry + if (done) { + if (future.isDefined) future.get.completeWithException(this, e) + else e.printStackTrace + } + } + currentTransaction.set(None) // need to clear currentTransaction before call to supervisor setThreadLocalTransaction(null) - // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - if (supervisor.isDefined) supervisor.get ! Exit(this, e) + + case e => + e.printStackTrace + decrementTransaction + + val tx = currentTransaction.get + rollback(tx) + if (future.isDefined) future.get.completeWithException(this, e) else e.printStackTrace + + currentTransaction.set(None) // need to clear currentTransaction before call to supervisor + setThreadLocalTransaction(null) + + // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client + if (supervisor.isDefined) supervisor.get ! Exit(this, e) + } finally { - decrementTransaction - if (isTransactionAborted) removeTransactionIfTopLevel - else tryToPrecommitTransaction - rescheduleClashedMessages - TransactionManagement.threadBoundTx.set(None) + if (currentTransaction.get.isDefined && currentTransaction.get.get.isAborted) removeTransactionIfTopLevel(currentTransaction.get.get) + else tryToPrecommitTransactions + currentTransaction.set(None) setThreadLocalTransaction(null) } } @@ -551,14 +573,6 @@ trait Actor extends Logging with TransactionManagement { if (future.exception.isDefined) throw future.exception.get._2 else future.result.asInstanceOf[Option[T]] - private def rescheduleClashedMessages = if (messageToReschedule.isDefined) { - val handle = messageToReschedule.get - val newTx = startNewTransaction - val clone = new MessageInvocation(handle.sender, handle.message, handle.future, newTx) - log.debug("Rescheduling message %s", clone) - mailbox.append(clone) // FIXME append or prepend rescheduled messages? - } - private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive) private val lifeCycle: PartialFunction[Any, Unit] = { @@ -588,7 +602,7 @@ trait Actor extends Logging with TransactionManagement { private[Actor] def restart(reason: AnyRef) = synchronized { lifeCycleConfig match { - case None => throw new IllegalStateException("Server [" + id + "] does not have a life-cycle defined.") + case None => throw new IllegalStateException("Actor [" + id + "] does not have a life-cycle defined.") // FIXME implement support for shutdown time case Some(LifeCycle(scope, shutdownTime, _)) => { @@ -605,10 +619,10 @@ trait Actor extends Logging with TransactionManagement { // log.debug("Restarting actor [%s] configured as TEMPORARY (since exited naturally).", id) // scheduleRestart // } else - log.info("Server [%s] configured as TEMPORARY will not be restarted (received unnatural exit message).", id) + log.info("Actor [%s] configured as TEMPORARY will not be restarted (received unnatural exit message).", id) case Transient => - log.info("Server [%s] configured as TRANSIENT will not be restarted.", id) + log.info("Actor [%s] configured as TRANSIENT will not be restarted.", id) } } } @@ -644,7 +658,7 @@ trait Actor extends Logging with TransactionManagement { !message.isInstanceOf[Tuple6[_,_,_,_,_,_]] && !message.isInstanceOf[Tuple7[_,_,_,_,_,_,_]] && !message.isInstanceOf[Tuple8[_,_,_,_,_,_,_,_]] && - !message.isInstanceOf[Array[_]] && + !message.getClass.isArray && !message.isInstanceOf[List[_]] && !message.isInstanceOf[scala.collection.immutable.Map[_,_]] && !message.isInstanceOf[scala.collection.immutable.Set[_]] && diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala index b13b234fc4..e891d015e5 100755 --- a/akka-actors/src/main/scala/nio/RemoteServer.scala +++ b/akka-actors/src/main/scala/nio/RemoteServer.scala @@ -8,11 +8,10 @@ import java.lang.reflect.InvocationTargetException import java.net.InetSocketAddress import java.util.concurrent.{ConcurrentHashMap, Executors} -import actor._ -import util._ -import protobuf.RemoteProtocol -import protobuf.RemoteProtocol.{RemoteReply, RemoteRequest} -import serialization.{Serializer, Serializable, SerializationProtocol} +import se.scalablesolutions.akka.actor._ +import se.scalablesolutions.akka.util._ +import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.{RemoteReply, RemoteRequest} +import se.scalablesolutions.akka.Config.config import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.channel._ @@ -31,7 +30,6 @@ class RemoteServer extends Logging { * @author Jonas Bonér */ object RemoteServer extends Logging { - import Config.config val HOSTNAME = config.getString("akka.remote.hostname", "localhost") val PORT = config.getInt("akka.remote.port", 9999) val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.connection-timeout", 1000) @@ -70,7 +68,7 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) ext def getPipeline: ChannelPipeline = { val p = Channels.pipeline() p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)) - p.addLast("protobufDecoder", new ProtobufDecoder(RemoteProtocol.RemoteRequest.getDefaultInstance)) + p.addLast("protobufDecoder", new ProtobufDecoder(RemoteRequest.getDefaultInstance)) p.addLast("frameEncoder", new LengthFieldPrepender(4)) p.addLast("protobufEncoder", new ProtobufEncoder) p.addLast("handler", new RemoteServerHandler(name, loader)) @@ -234,6 +232,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL val activeObjectOrNull = activeObjects.get(name) if (activeObjectOrNull == null) { try { + log.info("Creating a new remote active object [%s]", name) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) else Class.forName(name) val newInstance = activeObjectFactory.newInstance(clazz, timeout).asInstanceOf[AnyRef] @@ -252,6 +251,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL val actorOrNull = actors.get(name) if (actorOrNull == null) { try { + log.info("Creating a new remote actor [%s]", name) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) else Class.forName(name) val newInstance = clazz.newInstance.asInstanceOf[Actor] diff --git a/akka-actors/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala b/akka-actors/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala index 90518dbd73..de200a007d 100644 --- a/akka-actors/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala +++ b/akka-actors/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala @@ -26,7 +26,7 @@ class EventBasedSingleThreadDispatcher(name: String) extends MessageDispatcherBa val iter = selectedInvocations.iterator while (iter.hasNext) { val invocation = iter.next - val invoker = messageHandlers.get(invocation.sender) + val invoker = messageHandlers.get(invocation.receiver) if (invoker != null) invoker.invoke(invocation) iter.remove } diff --git a/akka-actors/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala b/akka-actors/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala index c63237f12c..18a9286402 100644 --- a/akka-actors/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala +++ b/akka-actors/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala @@ -97,7 +97,7 @@ class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: B threadPoolBuilder.execute(new Runnable() { def run = { invoker.invoke(invocation) - free(invocation.sender) + free(invocation.receiver) messageDemultiplexer.wakeUp } }) @@ -119,16 +119,16 @@ class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: B while (iterator.hasNext) { val invocation = iterator.next if (concurrentMode) { - val invoker = messageHandlers.get(invocation.sender) + val invoker = messageHandlers.get(invocation.receiver) if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]") if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null") result.put(invocation, invoker) - } else if (!busyInvokers.contains(invocation.sender)) { - val invoker = messageHandlers.get(invocation.sender) + } else if (!busyInvokers.contains(invocation.receiver)) { + val invoker = messageHandlers.get(invocation.receiver) if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]") if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null") result.put(invocation, invoker) - busyInvokers.add(invocation.sender) + busyInvokers.add(invocation.receiver) iterator.remove } } diff --git a/akka-actors/src/main/scala/reactor/Reactor.scala b/akka-actors/src/main/scala/reactor/Reactor.scala index a4bb3e3784..1f4716551e 100644 --- a/akka-actors/src/main/scala/reactor/Reactor.scala +++ b/akka-actors/src/main/scala/reactor/Reactor.scala @@ -5,8 +5,12 @@ package se.scalablesolutions.akka.reactor import java.util.List -import stm.Transaction -import util.HashCode + +import se.scalablesolutions.akka.util.HashCode +import se.scalablesolutions.akka.stm.Transaction +import se.scalablesolutions.akka.actor.Actor + +import java.util.concurrent.atomic.AtomicInteger trait MessageQueue { def append(handle: MessageInvocation) @@ -32,23 +36,35 @@ trait MessageDemultiplexer { def wakeUp } -class MessageInvocation(val sender: AnyRef, +class MessageInvocation(val receiver: Actor, val message: AnyRef, val future: Option[CompletableFutureResult], val tx: Option[Transaction]) { + if (receiver == null) throw new IllegalArgumentException("receiver is null") + if (message == null) throw new IllegalArgumentException("message is null") - override def hashCode(): Int = { + private [akka] val nrOfDeliveryAttempts = new AtomicInteger(0) + + def send = synchronized { + receiver.mailbox.append(this) + nrOfDeliveryAttempts.incrementAndGet + } + + override def hashCode(): Int = synchronized { var result = HashCode.SEED - result = HashCode.hash(result, sender) + result = HashCode.hash(result, receiver) result = HashCode.hash(result, message) result } - override def equals(that: Any): Boolean = + override def equals(that: Any): Boolean = synchronized { that != null && that.isInstanceOf[MessageInvocation] && - that.asInstanceOf[MessageInvocation].sender == sender && + that.asInstanceOf[MessageInvocation].receiver == receiver && that.asInstanceOf[MessageInvocation].message == message - - override def toString(): String = "MessageInvocation[message = " + message + ", sender = " + sender + ", future = " + future + ", tx = " + tx + "]" + } + + override def toString(): String = synchronized { + "MessageInvocation[message = " + message + ", receiver = " + receiver + ", future = " + future + ", tx = " + tx + "]" + } } diff --git a/akka-actors/src/main/scala/stm/Transaction.scala b/akka-actors/src/main/scala/stm/Transaction.scala index a847f4edca..a2c6e40c78 100644 --- a/akka-actors/src/main/scala/stm/Transaction.scala +++ b/akka-actors/src/main/scala/stm/Transaction.scala @@ -4,10 +4,12 @@ package se.scalablesolutions.akka.stm -import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -import state.Transactional -import util.Logging -import actor.Actor +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicInteger + +import se.scalablesolutions.akka.reactor.MessageInvocation +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.state.Committable import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.stms.alpha.AlphaStm @@ -15,7 +17,9 @@ import org.multiverse.utils.GlobalStmInstance import org.multiverse.utils.TransactionThreadLocal._ import org.multiverse.templates.{OrElseTemplate, AtomicTemplate} -import java.util.concurrent.atomic.AtomicInteger +import scala.collection.mutable.HashMap + +class TransactionRetryException(message: String) extends RuntimeException(message) /** * @author Jonas Bonér @@ -75,28 +79,29 @@ object Transaction { @volatile private[this] var status: TransactionStatus = TransactionStatus.New private[akka] var transaction: MultiverseTransaction = _ -// private[this] var initMessage: Option[AnyRef] = None -// private[this] var initReceiver: Option[Actor] = None - + private[this] var message: Option[MessageInvocation] = None + private[this] var participants: List[String] = Nil private[this] var precommitted: List[String] = Nil - private[this] val depth = new AtomicInteger(0) + private[this] val persistentStateMap = new HashMap[String, Committable] + + private[akka] val depth = new AtomicInteger(0) def increment = depth.incrementAndGet def decrement = depth.decrementAndGet - def isTopLevel = depth.compareAndSet(0, 0) - - def begin(participant: String) = synchronized { -// def begin(participant: String, message, receiver) = synchronized { - ensureIsActiveOrNew -// initMessage = Some(message) -// initReceiver = Some(receiver) - transaction = Multiverse.STM.startUpdateTransaction("akka") - log.debug("Creating a new transaction with id [%s]", id) + def isTopLevel = depth.get == 0 - if (status == TransactionStatus.New) log.debug("TX BEGIN - Server with UUID [%s] is starting NEW transaction [%s]", participant, toString) - else log.debug("Server [%s] is participating in transaction", participant) + def register(uuid: String, storage: Committable) = persistentStateMap.put(uuid, storage) + + def begin(participant: String, msg: MessageInvocation) = synchronized { + ensureIsActiveOrNew + message = Some(msg) + transaction = Multiverse.STM.startUpdateTransaction("akka") + log.debug("TX BEGIN - Creating a new transaction with id [%s]", id) + + if (status == TransactionStatus.New) log.debug("TX BEGIN - Actor with UUID [%s] is starting NEW transaction [%s]", participant, toString) + else log.debug("Actor [%s] is participating in transaction", participant) participants ::= participant status = TransactionStatus.Active } @@ -109,8 +114,9 @@ object Transaction { } def commit(participant: String): Boolean = synchronized { + log.debug("TX COMMIT - Trying to commit transaction [%s] for server with UUID [%s]", toString, participant) + setThreadLocalTransaction(transaction) if (status == TransactionStatus.Active) { - log.debug("TX COMMIT - Committing transaction [%s] for server with UUID [%s]", toString, participant) val haveAllPreCommitted = if (participants.size == precommitted.size) {{ for (part <- participants) yield { @@ -119,50 +125,62 @@ object Transaction { }}.exists(_ == true) } else false if (haveAllPreCommitted && transaction != null) { + log.debug("TX COMMIT - Committing transaction [%s] for server with UUID [%s]", toString, participant) transaction.commit - transaction.reset - status = TransactionStatus.Completed reset + status = TransactionStatus.Completed + Transaction.Atomic { + persistentStateMap.values.foreach(_.commit) + } true } else false } else { - reset true } } def rollback(participant: String) = synchronized { ensureIsActiveOrAborted - log.debug("TX ROLLBACK - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString) + log.debug("TX ROLLBACK - Actor with UUID [%s] has initiated transaction rollback for [%s]", participant, toString) status = TransactionStatus.Aborted transaction.abort - transaction.reset reset } def rollbackForRescheduling(participant: String) = synchronized { ensureIsActiveOrAborted - log.debug("TX ROLLBACK for recheduling - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString) + log.debug("TX ROLLBACK for recheduling - Actor with UUID [%s] has initiated transaction rollback for [%s]", participant, toString) transaction.abort reset } def join(participant: String) = synchronized { ensureIsActive - log.debug("TX JOIN - Server with UUID [%s] is joining transaction [%s]" , participant, toString) + log.debug("TX JOIN - Actor with UUID [%s] is joining transaction [%s]" , participant, toString) participants ::= participant } + def retry: Boolean = synchronized { + println("----- 2 " + message.isDefined) + println("----- 3 " + message.get.nrOfDeliveryAttempts) + if (message.isDefined && message.get.nrOfDeliveryAttempts.get < TransactionManagement.MAX_NR_OF_RETRIES) { + log.debug("TX RETRY - Restarting transaction [%s] resending message [%s]", transaction, message.get) + message.get.send + true + } else false + } + + private def reset = synchronized { + transaction.reset + participants = Nil + precommitted = Nil + } + def isNew = synchronized { status == TransactionStatus.New } def isActive = synchronized { status == TransactionStatus.Active } def isCompleted = synchronized { status == TransactionStatus.Completed } def isAborted = synchronized { status == TransactionStatus.Aborted } - private def reset = synchronized { - participants = Nil - precommitted = Nil - } - private def ensureIsActive = if (status != TransactionStatus.Active) throw new IllegalStateException("Expected ACTIVE transaction - current status [" + status + "]: " + toString) diff --git a/akka-actors/src/main/scala/stm/TransactionManagement.scala b/akka-actors/src/main/scala/stm/TransactionManagement.scala index 6cabdaa9d9..c2f4db76b0 100644 --- a/akka-actors/src/main/scala/stm/TransactionManagement.scala +++ b/akka-actors/src/main/scala/stm/TransactionManagement.scala @@ -6,9 +6,14 @@ package se.scalablesolutions.akka.stm import java.util.concurrent.atomic.AtomicBoolean -import reactor.MessageInvocation -import util.Logging -import org.codehaus.aspectwerkz.proxy.Uuid // FIXME is java.util.UUID better? +import se.scalablesolutions.akka.reactor.MessageInvocation +import se.scalablesolutions.akka.util.Logging + +import org.codehaus.aspectwerkz.proxy.Uuid + +import scala.collection.mutable.HashSet + +// FIXME is java.util.UUID better? import org.multiverse.utils.TransactionThreadLocal._ @@ -19,9 +24,10 @@ class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Tran } object TransactionManagement { - import Config._ + import se.scalablesolutions.akka.Config._ val TIME_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-for-completion", 100) val NR_OF_TIMES_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-nr-of-times", 3) + val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 10) val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false)) // FIXME reenable 'akka.stm.restart-on-collision' when new STM is in place val RESTART_TRANSACTION_ON_COLLISION = false //akka.Kernel.config.getBool("akka.stm.restart-on-collision", true) @@ -29,7 +35,7 @@ object TransactionManagement { def isTransactionalityEnabled = TRANSACTION_ENABLED.get def disableTransactions = TRANSACTION_ENABLED.set(false) - private[akka] val threadBoundTx: ThreadLocal[Option[Transaction]] = new ThreadLocal[Option[Transaction]]() { + private[akka] val currentTransaction: ThreadLocal[Option[Transaction]] = new ThreadLocal[Option[Transaction]]() { override protected def initialValue: Option[Transaction] = None } } @@ -37,41 +43,41 @@ object TransactionManagement { trait TransactionManagement extends Logging { var uuid = Uuid.newUuid.toString - protected[this] var latestMessage: Option[MessageInvocation] = None - protected[this] var messageToReschedule: Option[MessageInvocation] = None + import TransactionManagement.currentTransaction + private[akka] val activeTransactions = new HashSet[Transaction] - import TransactionManagement.threadBoundTx - private[akka] var activeTx: Option[Transaction] = None - - protected def startNewTransaction: Option[Transaction] = { + protected def startNewTransaction(message: MessageInvocation) = { val newTx = new Transaction - newTx.begin(uuid) - val tx = Some(newTx) - activeTx = tx - threadBoundTx.set(tx) - setThreadLocalTransaction(tx.get.transaction) - tx + newTx.begin(uuid, message) + activeTransactions += newTx + currentTransaction.set(Some(newTx)) + setThreadLocalTransaction(newTx.transaction) } protected def joinExistingTransaction = { - val cflowTx = threadBoundTx.get - if (!activeTx.isDefined && cflowTx.isDefined) { + val cflowTx = currentTransaction.get + if (activeTransactions.isEmpty && cflowTx.isDefined) { val currentTx = cflowTx.get currentTx.join(uuid) - activeTx = Some(currentTx) + activeTransactions += currentTx + } + } + + protected def tryToPrecommitTransactions = activeTransactions.foreach(_.precommit(uuid)) + + protected def tryToCommitTransactions = { + for (tx <- activeTransactions) { + if (tx.commit(uuid)) activeTransactions -= tx + else if (tx.isTopLevel) { + println("------------ COULD NOT COMMIT -- WAITING OR TIMEOUT? ---------") + //tx.retry + } else { + // continue, try to commit on next received message + // FIXME check if TX hase timed out => throw exception + } } } - - protected def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(uuid) - - protected def tryToCommitTransaction: Boolean = if (activeTx.isDefined) { - val tx = activeTx.get - if (tx.commit(uuid)) { - removeTransactionIfTopLevel - true - } else false - } else true - + protected def rollback(tx: Option[Transaction]) = tx match { case None => {} // no tx; nothing to do case Some(tx) => @@ -84,39 +90,12 @@ trait TransactionManagement extends Logging { tx.rollbackForRescheduling(uuid) } - protected def handleCollision = { - var nrRetries = 0 - var failed = true - do { - Thread.sleep(TransactionManagement.TIME_WAITING_FOR_COMPLETION) - nrRetries += 1 - log.debug("Pending transaction [%s] not completed, waiting %s milliseconds. Attempt %s", activeTx.get.id, TransactionManagement.TIME_WAITING_FOR_COMPLETION, nrRetries) - failed = !tryToCommitTransaction - } while(nrRetries < TransactionManagement.NR_OF_TIMES_WAITING_FOR_COMPLETION && failed) - if (failed) { - log.debug("Pending transaction [%s] still not completed, aborting and rescheduling message [%s]", activeTx.get.id, latestMessage) - rollback(activeTx) - if (TransactionManagement.RESTART_TRANSACTION_ON_COLLISION) messageToReschedule = Some(latestMessage.get) - else throw new TransactionRollbackException("Conflicting transactions, rolling back transaction for message [" + latestMessage + "]") - } - } + protected def isInExistingTransaction = currentTransaction.get.isDefined - protected def isInExistingTransaction = TransactionManagement.threadBoundTx.get.isDefined + protected def incrementTransaction = if (currentTransaction.get.isDefined) currentTransaction.get.get.increment - protected def isTransactionTopLevel = activeTx.isDefined && activeTx.get.isTopLevel - - protected def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted + protected def decrementTransaction = if (currentTransaction.get.isDefined) currentTransaction.get.get.decrement - protected def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment - - protected def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement - - protected def removeTransactionIfTopLevel = if (isTransactionTopLevel) activeTx = None - - protected def reenteringExistingTransaction= if (activeTx.isDefined) { - val cflowTx = threadBoundTx.get - if (cflowTx.isDefined && cflowTx.get.id == activeTx.get.id) false - else true - } else true + protected def removeTransactionIfTopLevel(tx: Transaction) = if (tx.isTopLevel) { activeTransactions -= tx } } diff --git a/akka-actors/src/main/scala/stm/TransactionalState.scala b/akka-actors/src/main/scala/stm/TransactionalState.scala index 29c54d273f..690e351812 100644 --- a/akka-actors/src/main/scala/stm/TransactionalState.scala +++ b/akka-actors/src/main/scala/stm/TransactionalState.scala @@ -45,7 +45,14 @@ object TransactionalState { @serializable trait Transactional { // FIXME: won't work across the cluster - val uuid = Uuid.newUuid.toString + var uuid = Uuid.newUuid.toString +} + +/** + * @author Jonas Bonér + */ +trait Committable { + def commit: Unit } /** @@ -71,17 +78,15 @@ class TransactionalRef[T] extends Transactional { def swap(elem: T) = ref.set(elem) def get: Option[T] = { -// if (ref.isNull) None - // else - Some(ref.get) + if (ref.isNull) None + else Some(ref.get) } def getOrWait: T = ref.getOrAwait def getOrElse(default: => T): T = { -// if (ref.isNull) default - //else - ref.get + if (ref.isNull) default + else ref.get } def isDefined: Boolean = !ref.isNull diff --git a/akka-actors/src/main/scala/stm/Vector.scala b/akka-actors/src/main/scala/stm/Vector.scala index d743c82ddb..27729c0a07 100644 --- a/akka-actors/src/main/scala/stm/Vector.scala +++ b/akka-actors/src/main/scala/stm/Vector.scala @@ -298,7 +298,7 @@ class Vector[+T] private (val length: Int, shift: Int, root: Array[AnyRef], tail } override def equals(other: Any) = other match { - case vec: Vector[T] => { + case vec: Vector[_] => { var back = length == vec.length var i = 0 diff --git a/akka-actors/src/test/scala/EventBasedSingleThreadDispatcherTest.scala b/akka-actors/src/test/scala/EventBasedSingleThreadDispatcherTest.scala index 758f9d6cd0..9aba70fc48 100644 --- a/akka-actors/src/test/scala/EventBasedSingleThreadDispatcherTest.scala +++ b/akka-actors/src/test/scala/EventBasedSingleThreadDispatcherTest.scala @@ -8,6 +8,7 @@ import java.util.concurrent.locks.ReentrantLock import org.junit.{Test, Before} import org.junit.Assert._ import junit.framework.TestCase +import se.scalablesolutions.akka.actor.Actor class EventBasedSingleThreadDispatcherTest extends TestCase { private var threadingIssueDetected: AtomicBoolean = null @@ -51,15 +52,18 @@ class EventBasedSingleThreadDispatcherTest extends TestCase { internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder } + val key1 = new Actor { def receive: PartialFunction[Any, Unit] = { case _ => {}} } + val key2 = new Actor { def receive: PartialFunction[Any, Unit] = { case _ => {}} } + val key3 = new Actor { def receive: PartialFunction[Any, Unit] = { case _ => {}} } + private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = { val guardLock = new ReentrantLock val handleLatch = new CountDownLatch(100) - val key = "key" val dispatcher = new EventBasedSingleThreadDispatcher("name") - dispatcher.registerHandler(key, new TestMessageHandle(handleLatch)) + dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch)) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None)) } assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertFalse(threadingIssueDetected.get) @@ -67,8 +71,6 @@ class EventBasedSingleThreadDispatcherTest extends TestCase { private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedSequentially: Unit = { val handleLatch = new CountDownLatch(2) - val key1 = "key1" - val key2 = "key2" val dispatcher = new EventBasedSingleThreadDispatcher("name") dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch)) dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch)) @@ -81,8 +83,6 @@ class EventBasedSingleThreadDispatcherTest extends TestCase { private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = { val handleLatch = new CountDownLatch(200) - val key1 = "key1" - val key2 = "key2" val dispatcher = new EventBasedSingleThreadDispatcher("name") dispatcher.registerHandler(key1, new MessageInvoker { var currentValue = -1; diff --git a/akka-actors/src/test/scala/EventBasedThreadPoolDispatcherTest.scala b/akka-actors/src/test/scala/EventBasedThreadPoolDispatcherTest.scala index a57ad0b825..00e6ceff8b 100644 --- a/akka-actors/src/test/scala/EventBasedThreadPoolDispatcherTest.scala +++ b/akka-actors/src/test/scala/EventBasedThreadPoolDispatcherTest.scala @@ -9,9 +9,13 @@ import org.junit.Before import org.junit.Test import org.junit.Assert._ import junit.framework.TestCase +import se.scalablesolutions.akka.actor.Actor class EventBasedThreadPoolDispatcherTest extends TestCase { private var threadingIssueDetected: AtomicBoolean = null + val key1 = new Actor { def receive: PartialFunction[Any, Unit] = { case _ => {}} } + val key2 = new Actor { def receive: PartialFunction[Any, Unit] = { case _ => {}} } + val key3 = new Actor { def receive: PartialFunction[Any, Unit] = { case _ => {}} } @Before override def setUp = { @@ -36,7 +40,6 @@ class EventBasedThreadPoolDispatcherTest extends TestCase { private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = { val guardLock = new ReentrantLock val handleLatch = new CountDownLatch(10) - val key = "key" val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name") dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100) .setCorePoolSize(2) @@ -44,7 +47,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase { .setKeepAliveTimeInMillis(60000) .setRejectionPolicy(new CallerRunsPolicy) .buildThreadPool - dispatcher.registerHandler(key, new MessageInvoker { + dispatcher.registerHandler(key1, new MessageInvoker { def invoke(message: MessageInvocation) { try { if (threadingIssueDetected.get) return @@ -64,7 +67,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase { }) dispatcher.start for (i <- 0 until 10) { - dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None)) } assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertFalse(threadingIssueDetected.get) @@ -74,8 +77,6 @@ class EventBasedThreadPoolDispatcherTest extends TestCase { val guardLock1 = new ReentrantLock val guardLock2 = new ReentrantLock val handlersBarrier = new CyclicBarrier(3) - val key1 = "key1" - val key2 = "key2" val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name") dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100) .setCorePoolSize(2) @@ -119,8 +120,6 @@ class EventBasedThreadPoolDispatcherTest extends TestCase { private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = { val handleLatch = new CountDownLatch(200) - val key1 = "key1" - val key2 = "key2" val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name") dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100) .setCorePoolSize(2) diff --git a/akka-actors/src/test/scala/InMemoryActorSpec.scala b/akka-actors/src/test/scala/InMemoryActorSpec.scala index 72bfe8c925..f242ed780c 100644 --- a/akka-actors/src/test/scala/InMemoryActorSpec.scala +++ b/akka-actors/src/test/scala/InMemoryActorSpec.scala @@ -201,7 +201,7 @@ class InMemoryActorSpec extends TestCase { stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired assertEquals("new state", (stateful !! GetRefState).get) } - + @Test def testOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = { val stateful = new InMemStatefulActor @@ -226,6 +226,7 @@ class InMemoryActorSpec extends TestCase { stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method fail("should have thrown an exception") } catch {case e: RuntimeException => {}} + println("---------- BACK") assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state } } diff --git a/akka-actors/src/test/scala/ThreadBasedDispatcherTest.scala b/akka-actors/src/test/scala/ThreadBasedDispatcherTest.scala index a76d02815d..18daa6084e 100644 --- a/akka-actors/src/test/scala/ThreadBasedDispatcherTest.scala +++ b/akka-actors/src/test/scala/ThreadBasedDispatcherTest.scala @@ -8,10 +8,14 @@ import java.util.concurrent.locks.ReentrantLock import org.junit.{Test, Before} import org.junit.Assert._ import junit.framework.TestCase +import se.scalablesolutions.akka.actor.Actor class ThreadBasedDispatcherTest extends TestCase { private var threadingIssueDetected: AtomicBoolean = null - + val key1 = new Actor { def receive: PartialFunction[Any, Unit] = { case _ => {}} } + val key2 = new Actor { def receive: PartialFunction[Any, Unit] = { case _ => {}} } + val key3 = new Actor { def receive: PartialFunction[Any, Unit] = { case _ => {}} } + class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker { val guardLock: Lock = new ReentrantLock @@ -52,7 +56,7 @@ class ThreadBasedDispatcherTest extends TestCase { val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch)) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageInvocation("id", new Object, None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None)) } assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertFalse(threadingIssueDetected.get) @@ -73,7 +77,7 @@ class ThreadBasedDispatcherTest extends TestCase { }) dispatcher.start for (i <- 0 until 100) { - dispatcher.messageQueue.append(new MessageInvocation("id", new Integer(i), None, None)) + dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None)) } assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertFalse(threadingIssueDetected.get) diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java index bca67d68d1..7c6b37a0a0 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java @@ -38,32 +38,37 @@ public class InMemNestedStateTest extends TestCase { conf.stop(); } - public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { + public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() throws Exception { InMemStateful stateful = conf.getInstance(InMemStateful.class); stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class); nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired + System.out.println("-- BACK --"); assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); assertEquals("new state", nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); } - public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() { + public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() throws InterruptedException { InMemStateful stateful = conf.getInstance(InMemStateful.class); stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state + Thread.sleep(100); InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class); nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state + Thread.sleep(100); InMemFailer failer = conf.getInstance(InMemFailer.class); try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactionrequired method + Thread.sleep(100); fail("should have thrown an exception"); } catch (RuntimeException e) { } // expected assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state + Thread.sleep(100); assertEquals("init", nested.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state } - public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { + public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() throws Exception { InMemStateful stateful = conf.getInstance(InMemStateful.class); stateful.setVectorState("init"); // set init state InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class); @@ -73,22 +78,26 @@ public class InMemNestedStateTest extends TestCase { assertEquals("new state", nested.getVectorState()); } - public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() { + public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() throws InterruptedException { InMemStateful stateful = conf.getInstance(InMemStateful.class); stateful.setVectorState("init"); // set init state + Thread.sleep(100); InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class); nested.setVectorState("init"); // set init state + Thread.sleep(100); InMemFailer failer = conf.getInstance(InMemFailer.class); try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactionrequired method + Thread.sleep(100); fail("should have thrown an exception"); } catch (RuntimeException e) { } // expected assertEquals("init", stateful.getVectorState()); // check that state is == init state + Thread.sleep(100); assertEquals("init", nested.getVectorState()); // check that state is == init state } - public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { + public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() throws Exception { InMemStateful stateful = conf.getInstance(InMemStateful.class); InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class); stateful.setRefState("init"); // set init state @@ -98,18 +107,22 @@ public class InMemNestedStateTest extends TestCase { assertEquals("new state", nested.getRefState()); } - public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() { + public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() throws InterruptedException { InMemStateful stateful = conf.getInstance(InMemStateful.class); InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class); stateful.setRefState("init"); // set init state + Thread.sleep(100); nested.setRefState("init"); // set init state + Thread.sleep(100); InMemFailer failer = conf.getInstance(InMemFailer.class); try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactionrequired method + Thread.sleep(100); fail("should have thrown an exception"); } catch (RuntimeException e) { } // expected assertEquals("init", stateful.getRefState()); // check that state is == init state + Thread.sleep(100); assertEquals("init", nested.getRefState()); // check that state is == init state } } diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java index b89a555c58..abaedf8ae9 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java @@ -6,80 +6,80 @@ import se.scalablesolutions.akka.state.*; @transactionrequired public class InMemStatefulNested { - private TransactionalMap mapState; - private TransactionalVector vectorState; - private TransactionalRef refState; - private boolean isInitialized = false; + private TransactionalMap mapState; + private TransactionalVector vectorState; + private TransactionalRef refState; + private boolean isInitialized = false; - public void init() { - if (!isInitialized) { - mapState = TransactionalState.newMap(); - vectorState = TransactionalState.newVector(); - refState = TransactionalState.newRef(); - isInitialized = true; - } + public void init() { + if (!isInitialized) { + mapState = TransactionalState.newMap(); + vectorState = TransactionalState.newVector(); + refState = TransactionalState.newRef(); + isInitialized = true; } + } - public String getMapState(String key) { - return (String) mapState.get(key).get(); - } + public String getMapState(String key) { + return (String) mapState.get(key).get(); + } - public String getVectorState() { - return (String) vectorState.last(); - } + public String getVectorState() { + return (String) vectorState.last(); + } - public String getRefState() { - return (String) refState.get().get(); - } + public String getRefState() { + return (String) refState.get().get(); + } - public void setMapState(String key, String msg) { - mapState.put(key, msg); - } + public void setMapState(String key, String msg) { + mapState.put(key, msg); + } - public void setVectorState(String msg) { - vectorState.add(msg); - } + public void setVectorState(String msg) { + vectorState.add(msg); + } - public void setRefState(String msg) { - refState.swap(msg); - } + public void setRefState(String msg) { + refState.swap(msg); + } - public void success(String key, String msg) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); - } + public void success(String key, String msg) { + mapState.put(key, msg); + vectorState.add(msg); + refState.swap(msg); + } - public String failure(String key, String msg, InMemFailer failer) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); - failer.fail(); - return msg; - } + public String failure(String key, String msg, InMemFailer failer) { + mapState.put(key, msg); + vectorState.add(msg); + refState.swap(msg); + failer.fail(); + return msg; + } - public void thisMethodHangs(String key, String msg, InMemFailer failer) { - setMapState(key, msg); - } + public void thisMethodHangs(String key, String msg, InMemFailer failer) { + setMapState(key, msg); + } - /* - public void clashOk(String key, String msg, InMemClasher clasher) { - mapState.put(key, msg); - clasher.clash(); - } + /* + public void clashOk(String key, String msg, InMemClasher clasher) { + mapState.put(key, msg); + clasher.clash(); + } - public void clashNotOk(String key, String msg, InMemClasher clasher) { - mapState.put(key, msg); - clasher.clash(); - this.success("clash", "clash"); - } - */ + public void clashNotOk(String key, String msg, InMemClasher clasher) { + mapState.put(key, msg); + clasher.clash(); + this.success("clash", "clash"); + } + */ } \ No newline at end of file diff --git a/akka-kernel/pom.xml b/akka-kernel/pom.xml index 9fcf2b9369..d14b0d2c86 100755 --- a/akka-kernel/pom.xml +++ b/akka-kernel/pom.xml @@ -158,7 +158,7 @@ org.apache.cassandra cassandra - 0.4.0-trunk + 0.4.0 com.facebook @@ -320,22 +320,5 @@ - - - false - ../config - - akka.conf - akka-reference.conf - - - - false - src/main/resources - - META-INF/* - - - diff --git a/akka-kernel/src/main/scala/AkkaServlet.scala b/akka-kernel/src/main/scala/AkkaServlet.scala index 3b24bd31e0..d5aab7e1d3 100755 --- a/akka-kernel/src/main/scala/AkkaServlet.scala +++ b/akka-kernel/src/main/scala/AkkaServlet.scala @@ -44,25 +44,19 @@ class AkkaServlet extends ServletContainer with AtmosphereServletProcessor with override def onMessage(event: AtmosphereEvent[HttpServletRequest, HttpServletResponse]): AtmosphereEvent[_, _] = { val response = event.getResponse - val data = if(event.getMessage ne null ) event.getMessage.toString else null + val data = if(event.getMessage ne null) event.getMessage.toString else null val isUsingStream = try { - response.getWriter - false - } catch { case e: IllegalStateException => true } + response.getWriter + false + } catch { case e: IllegalStateException => true } - if (isUsingStream) - { - if (data != null) - response.getOutputStream.write(data.getBytes) + if (isUsingStream) { + if (data != null) response.getOutputStream.write(data.getBytes) response.getOutputStream.flush - } - else - { - if (data != null) - response.getWriter.write(data) + } else { + if (data != null) response.getWriter.write(data) response.getWriter.flush - } - + } event } diff --git a/akka-persistence/src/main/scala/PersistentState.scala b/akka-persistence/src/main/scala/PersistentState.scala index 56096df810..7363da5be5 100644 --- a/akka-persistence/src/main/scala/PersistentState.scala +++ b/akka-persistence/src/main/scala/PersistentState.scala @@ -5,12 +5,15 @@ package se.scalablesolutions.akka.state import stm.TransactionManagement +import stm.TransactionManagement.currentTransaction import akka.collection._ import org.codehaus.aspectwerkz.proxy.Uuid import scala.collection.mutable.{ArrayBuffer, HashMap} +class NoTransactionInScopeException extends RuntimeException + sealed abstract class PersistentStateConfig abstract class PersistentStorageConfig extends PersistentStateConfig case class CassandraStorageConfig extends PersistentStorageConfig @@ -62,7 +65,7 @@ object PersistentState { * * @author Jonas Bonér */ -trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Transactional { +trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Transactional with Committable { protected val newAndUpdatedEntries = TransactionalState.newMap[AnyRef, AnyRef] protected val removedEntries = TransactionalState.newMap[AnyRef, AnyRef] protected val shouldClearOnCommit = TransactionalRef[Boolean]() @@ -70,10 +73,10 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr // to be concretized in subclasses val storage: MapStorage - private[akka] def commit = { + def commit = { storage.removeMapStorageFor(uuid, removedEntries.toList) storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList) - if (shouldClearOnCommit.isDefined & shouldClearOnCommit.get.get) storage.removeMapStorageFor(uuid) + if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get) storage.removeMapStorageFor(uuid) newAndUpdatedEntries.clear removedEntries.clear } @@ -82,28 +85,39 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr def +=(key: AnyRef, value: AnyRef) = put(key, value) - override def put(key: AnyRef, value: AnyRef): Option[AnyRef] = newAndUpdatedEntries.put(key, value) + override def put(key: AnyRef, value: AnyRef): Option[AnyRef] = { + register + newAndUpdatedEntries.put(key, value) + } - override def update(key: AnyRef, value: AnyRef) = newAndUpdatedEntries.update(key, value) - - def remove(key: AnyRef) = removedEntries.remove(key) + override def update(key: AnyRef, value: AnyRef) = { + register + newAndUpdatedEntries.update(key, value) + } + + def remove(key: AnyRef) = { + register + removedEntries.remove(key) + } def slice(start: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = slice(start, None, count) def slice(start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = try { - storage.getMapStorageRangeFor(uuid, start, finish, count) - } catch { case e: Exception => Nil } - - override def clear = shouldClearOnCommit.swap(true) + storage.getMapStorageRangeFor(uuid, start, finish, count) + } catch { case e: Exception => Nil } + override def clear = { + register + shouldClearOnCommit.swap(true) + } + override def contains(key: AnyRef): Boolean = try { - newAndUpdatedEntries.contains(key) || - storage.getMapStorageEntryFor(uuid, key).isDefined - } catch { case e: Exception => false } + newAndUpdatedEntries.contains(key) || storage.getMapStorageEntryFor(uuid, key).isDefined + } catch { case e: Exception => false } override def size: Int = try { - storage.getMapStorageSizeFor(uuid) + newAndUpdatedEntries.size - } catch { case e: Exception => 0 } + storage.getMapStorageSizeFor(uuid) + newAndUpdatedEntries.size + } catch { case e: Exception => 0 } override def get(key: AnyRef): Option[AnyRef] = { if (newAndUpdatedEntries.contains(key)) newAndUpdatedEntries.get(key) @@ -129,6 +143,11 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr override def hasNext: Boolean = synchronized { !elements.isEmpty } } } + + private def register = { + if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException + currentTransaction.get.get.register(uuid, this) + } } /** @@ -154,7 +173,7 @@ class MongoPersistentMap extends PersistentMap { * * @author Jonas Bonér */ -trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional { +trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional with Committable { protected val newElems = TransactionalState.newVector[AnyRef] protected val updatedElems = TransactionalState.newMap[Int, AnyRef] protected val removedElems = TransactionalState.newVector[AnyRef] @@ -162,7 +181,7 @@ trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional { val storage: VectorStorage - private[akka] def commit = { + def commit = { // FIXME: should use batch function once the bug is resolved for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element) for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2) @@ -170,9 +189,12 @@ trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional { updatedElems.clear } - def +(elem: AnyRef) = newElems + elem - - def add(elem: AnyRef) = newElems + elem + def +(elem: AnyRef) = add(elem) + + def add(elem: AnyRef) = { + register + newElems + elem + } def apply(index: Int): AnyRef = get(index) @@ -193,9 +215,15 @@ trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional { * Removes the tail element of this vector. */ // FIXME: implement persistent vector pop - def pop: AnyRef = throw new UnsupportedOperationException("need to implement persistent vector pop") + def pop: AnyRef = { + register + throw new UnsupportedOperationException("need to implement persistent vector pop") + } - def update(index: Int, newElem: AnyRef) = storage.updateVectorStorageEntryFor(uuid, index, newElem) + def update(index: Int, newElem: AnyRef) = { + register + storage.updateVectorStorageEntryFor(uuid, index, newElem) + } override def first: AnyRef = get(0) @@ -209,6 +237,11 @@ trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional { } def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length + + private def register = { + if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException + currentTransaction.get.get.register(uuid, this) + } } /** @@ -234,17 +267,20 @@ class MongoPersistentVector extends PersistentVector { * * @author Jonas Bonér */ -trait PersistentRef extends Transactional { +trait PersistentRef extends Transactional with Committable { protected val ref = new TransactionalRef[AnyRef] val storage: RefStorage - private[akka] def commit = if (ref.isDefined) { - storage.insertRefStorageFor(uuid, ref.get) + def commit = if (ref.isDefined) { + storage.insertRefStorageFor(uuid, ref.get.get) ref.swap(null) } - def swap(elem: AnyRef) = ref.swap(elem) + def swap(elem: AnyRef) = { + register + ref.swap(elem) + } def get: Option[AnyRef] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid) @@ -252,9 +288,14 @@ trait PersistentRef extends Transactional { def getOrElse(default: => AnyRef): AnyRef = { val current = get - if (current.isDefined) current + if (current.isDefined) current.get else default } + + private def register = { + if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException + currentTransaction.get.get.register(uuid, this) + } } class CassandraPersistentRef extends PersistentRef { diff --git a/akka-persistence/src/test/scala/AllTest.scala b/akka-persistence/src/test/scala/AllTest.scala index e09ace7996..60374da92d 100644 --- a/akka-persistence/src/test/scala/AllTest.scala +++ b/akka-persistence/src/test/scala/AllTest.scala @@ -8,7 +8,7 @@ import junit.framework.TestSuite object AllTest extends TestCase { def suite(): Test = { val suite = new TestSuite("All Scala tests") - suite.addTestSuite(classOf[CassandraPersistentActorSpec]) + //suite.addTestSuite(classOf[CassandraPersistentActorSpec]) //suite.addTestSuite(classOf[MongoPersistentActorSpec]) //suite.addTestSuite(classOf[MongoStorageSpec]) suite diff --git a/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java b/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java index 947a0f8b3f..9cbb2075e2 100644 --- a/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java +++ b/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java @@ -11,7 +11,7 @@ import javax.ws.rs.Produces; import se.scalablesolutions.akka.annotation.transactionrequired; import se.scalablesolutions.akka.annotation.prerestart; import se.scalablesolutions.akka.annotation.postrestart; -import se.scalablesolutions.akka.state.TransactionalState; +import se.scalablesolutions.akka.state.PersistentMap; import se.scalablesolutions.akka.state.PersistentState; import se.scalablesolutions.akka.state.TransactionalMap; import se.scalablesolutions.akka.state.CassandraStorageConfig; @@ -29,8 +29,7 @@ public class PersistentSimpleService { private String KEY = "COUNTER"; private boolean hasStartedTicking = false; - private PersistentState factory = new PersistentState(); - private TransactionalMap storage = factory.newMap(new CassandraStorageConfig()); + private PersistentMap storage = PersistentState.newMap(new CassandraStorageConfig()); @GET @Produces({"application/html"}) diff --git a/akka-samples-java/src/main/java/sample/java/SimpleService.java b/akka-samples-java/src/main/java/sample/java/SimpleService.java index 55811fb4bc..7702396375 100644 --- a/akka-samples-java/src/main/java/sample/java/SimpleService.java +++ b/akka-samples-java/src/main/java/sample/java/SimpleService.java @@ -28,8 +28,7 @@ public class SimpleService { private String KEY = "COUNTER"; private boolean hasStartedTicking = false; - private TransactionalState factory = new TransactionalState(); - private TransactionalMap storage = factory.newMap(); + private TransactionalMap storage = TransactionalState.newMap(); @GET @Produces({"application/json"}) diff --git a/akka.ipr b/akka.ipr index b6cf496b20..8909e01f8c 100644 --- a/akka.ipr +++ b/akka.ipr @@ -7,7 +7,91 @@ - + - + + + - - - - - - - - - - - - + + + + + + - + + + + + @@ -723,100 +711,86 @@ - + - + - + - + - + - + - + - + - + - + - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + + + + + + - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/config/akka-reference.conf b/config/akka-reference.conf index f6443b0ddd..89613a13d8 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -19,7 +19,7 @@ # FQN to the class doing initial active object/actor # supervisor bootstrap, should be defined in default constructor - boot = ["sample.java.Boot", "sample.scala.Boot", "sample.secure.Boot", "se.foldleft.akka.demo.Boot"] + boot = ["sample.java.Boot", "sample.scala.Boot", "sample.secure.Boot"] timeout = 5000 # default timeout for future based invocations @@ -66,7 +66,6 @@ hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance port = 27017 dbname = "mydb" - storage-format = "scala-json" # Options: java, scala-json, java-json, protobuf