diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index cd4ae2556c..ae474b5df2 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -216,6 +216,7 @@ trait Actor extends Logging with TransactionManagement { isRunning = true if (isTransactional) this ! TransactionalInit } + log.info("[%s] has started", toString) } /** @@ -364,8 +365,8 @@ trait Actor extends Logging with TransactionManagement { *

* To be invoked from within the actor itself. */ - protected[this] def startLinkRemote(actor: Actor) = { - actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) + protected[this] def startLinkRemote(actor: Actor, hostname: String, port: Int) = { + actor.makeRemote(hostname, port) actor.start link(actor) } @@ -377,8 +378,10 @@ trait Actor extends Logging with TransactionManagement { */ protected[this] def spawn[T <: Actor](actorClass: Class[T]): T = { val actor = actorClass.newInstance.asInstanceOf[T] - actor.dispatcher = dispatcher - actor.mailbox = mailbox + if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { + actor.dispatcher = dispatcher + actor.mailbox = mailbox + } actor.start actor } @@ -388,11 +391,13 @@ trait Actor extends Logging with TransactionManagement { *

* To be invoked from within the actor itself. */ - protected[this] def spawnRemote[T <: Actor](actorClass: Class[T]): T = { + protected[this] def spawnRemote[T <: Actor](actorClass: Class[T], hostname: String, port: Int): T = { val actor = actorClass.newInstance.asInstanceOf[T] - actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) - actor.dispatcher = dispatcher - actor.mailbox = mailbox + actor.makeRemote(hostname, port) + if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { + actor.dispatcher = dispatcher + actor.mailbox = mailbox + } actor.start actor } @@ -413,9 +418,9 @@ trait Actor extends Logging with TransactionManagement { *

* To be invoked from within the actor itself. */ - protected[this] def spawnLinkRemote[T <: Actor](actorClass: Class[T]): T = { + protected[this] def spawnLinkRemote[T <: Actor](actorClass: Class[T], hostname: String, port: Int): T = { val actor = spawn[T](actorClass) - actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) + actor.makeRemote(hostname, port) link(actor) actor } diff --git a/akka-actors/src/main/scala/stm/Transaction.scala b/akka-actors/src/main/scala/stm/Transaction.scala index 3da964a10e..a847f4edca 100644 --- a/akka-actors/src/main/scala/stm/Transaction.scala +++ b/akka-actors/src/main/scala/stm/Transaction.scala @@ -7,6 +7,7 @@ package se.scalablesolutions.akka.stm import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import state.Transactional import util.Logging +import actor.Actor import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.stms.alpha.AlphaStm @@ -74,6 +75,9 @@ object Transaction { @volatile private[this] var status: TransactionStatus = TransactionStatus.New private[akka] var transaction: MultiverseTransaction = _ +// private[this] var initMessage: Option[AnyRef] = None +// private[this] var initReceiver: Option[Actor] = None + private[this] var participants: List[String] = Nil private[this] var precommitted: List[String] = Nil @@ -84,7 +88,10 @@ object Transaction { def isTopLevel = depth.compareAndSet(0, 0) def begin(participant: String) = synchronized { +// def begin(participant: String, message, receiver) = synchronized { ensureIsActiveOrNew +// initMessage = Some(message) +// initReceiver = Some(receiver) transaction = Multiverse.STM.startUpdateTransaction("akka") log.debug("Creating a new transaction with id [%s]", id) diff --git a/akka-actors/src/main/scala/stm/TransactionManagement.scala b/akka-actors/src/main/scala/stm/TransactionManagement.scala index 9b74a63fba..6cabdaa9d9 100644 --- a/akka-actors/src/main/scala/stm/TransactionManagement.scala +++ b/akka-actors/src/main/scala/stm/TransactionManagement.scala @@ -22,7 +22,7 @@ object TransactionManagement { import Config._ val TIME_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-for-completion", 100) val NR_OF_TIMES_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-nr-of-times", 3) - val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", true)) + val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false)) // FIXME reenable 'akka.stm.restart-on-collision' when new STM is in place val RESTART_TRANSACTION_ON_COLLISION = false //akka.Kernel.config.getBool("akka.stm.restart-on-collision", true) diff --git a/akka-actors/src/main/scala/stm/TransactionalState.scala b/akka-actors/src/main/scala/stm/TransactionalState.scala index cf626fc5a9..29c54d273f 100644 --- a/akka-actors/src/main/scala/stm/TransactionalState.scala +++ b/akka-actors/src/main/scala/stm/TransactionalState.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.state import stm.{TransactionManagement, Ref} import org.multiverse.templates.AtomicTemplate import org.multiverse.api.Transaction; -import akka.collection._ +import collection._ import org.codehaus.aspectwerkz.proxy.Uuid @@ -71,15 +71,17 @@ class TransactionalRef[T] extends Transactional { def swap(elem: T) = ref.set(elem) def get: Option[T] = { - if (ref.isNull) None - else Some(ref.get) +// if (ref.isNull) None + // else + Some(ref.get) } def getOrWait: T = ref.getOrAwait def getOrElse(default: => T): T = { - if (ref.isNull) default - else ref.get +// if (ref.isNull) default + //else + ref.get } def isDefined: Boolean = !ref.isNull diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index 24efe78eff..8facc5d458 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -24,9 +24,9 @@ object Kernel extends Logging { import Config._ val BOOT_CLASSES = config.getList("akka.boot") - val RUN_REMOTE_SERVICE = config.getBool("akka.remote.service", true) + val RUN_REMOTE_SERVICE = config.getBool("akka.remote.service", false) + val RUN_REST_SERVICE = config.getBool("akka.rest.service", false) val STORAGE_SYSTEM = config.getString("akka.storage.system", "cassandra") - val RUN_REST_SERVICE = config.getBool("akka.rest.service", true) val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost") val REST_URL = "http://" + REST_HOSTNAME val REST_PORT = config.getInt("akka.rest.port", 9998) diff --git a/akka-persistence/pom.xml b/akka-persistence/pom.xml index a0563f1cb0..749d4c6121 100644 --- a/akka-persistence/pom.xml +++ b/akka-persistence/pom.xml @@ -37,18 +37,18 @@ org.apache.cassandra cassandra - 0.4.0-trunk + 0.4.0 com.facebook thrift 1.0 - + commons-pool commons-pool diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/config/Ref.java b/akka-util-java/src/main/java/se/scalablesolutions/akka/stm/Ref.java similarity index 98% rename from akka-util-java/src/main/java/se/scalablesolutions/akka/config/Ref.java rename to akka-util-java/src/main/java/se/scalablesolutions/akka/stm/Ref.java index b9ded29df3..1c2f100f26 100644 --- a/akka-util-java/src/main/java/se/scalablesolutions/akka/config/Ref.java +++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/stm/Ref.java @@ -22,7 +22,8 @@ import static java.lang.String.format; * @author Peter Veentjer */ public final class Ref extends FastAtomicObjectMixin implements ManagedRef { - final public static class NoTransactionInScopeException extends RuntimeException {} + final public static class NoTransactionInScopeException extends RuntimeException { + } public Ref() { Transaction tx = getThreadLocalTransaction(); @@ -56,7 +57,6 @@ public final class Ref extends FastAtomicObjectMixin implements ManagedRef return tranlocalRef.get(); } - @Override public E getOrAwait() { Transaction tx = getThreadLocalTransaction(); if (tx == null) throw new NoTransactionInScopeException(); @@ -69,8 +69,6 @@ public final class Ref extends FastAtomicObjectMixin implements ManagedRef return tranlocalRef.getOrAwait(); } - - @Override public E set(final E newRef) { Transaction tx = getThreadLocalTransaction(); if (tx == null) throw new NoTransactionInScopeException(); @@ -83,7 +81,6 @@ public final class Ref extends FastAtomicObjectMixin implements ManagedRef return tranlocalRef.set(newRef); } - @Override public boolean isNull() { Transaction tx = getThreadLocalTransaction(); if (tx == null) throw new NoTransactionInScopeException(); @@ -96,7 +93,6 @@ public final class Ref extends FastAtomicObjectMixin implements ManagedRef return tranlocalRef.isNull(); } - @Override public E clear() { Transaction tx = getThreadLocalTransaction(); if (tx == null) throw new NoTransactionInScopeException(); @@ -122,7 +118,6 @@ public final class Ref extends FastAtomicObjectMixin implements ManagedRef return tranlocalRef.toString(); } - @Override public RefTranlocal privatize(long readVersion) { RefTranlocal origin = (RefTranlocal) load(readVersion); if (origin == null) { diff --git a/akka.iws b/akka.iws index 11c532ab33..4e68a7a766 100644 --- a/akka.iws +++ b/akka.iws @@ -2,20 +2,17 @@ - - - - + - - - - - + + + + + + - @@ -36,7 +33,7 @@ - +