diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index 826e9a9ea2..bcfc4dc7a8 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -21,8 +21,6 @@ object AkkaApplication { val VERSION = "2.0-SNAPSHOT" - val GLOBAL_HOME = systemHome orElse envHome - val envHome = System.getenv("AKKA_HOME") match { case null | "" | "." ⇒ None case value ⇒ Some(value) @@ -33,6 +31,8 @@ object AkkaApplication { case value ⇒ Some(value) } + val GLOBAL_HOME = systemHome orElse envHome + val envConf = System.getenv("AKKA_MODE") match { case null | "" ⇒ None case value ⇒ Some(value) @@ -50,7 +50,7 @@ object AkkaApplication { } catch { case _ ⇒ None } val fromClasspath = try { - Some(Configuration.fromResource(defaultLocation)) + Some(Configuration.fromResource(defaultLocation, getClass.getClassLoader)) } catch { case _ ⇒ None } val fromHome = try { @@ -59,14 +59,19 @@ object AkkaApplication { val emptyConfig = Configuration.fromString("akka { version = \"" + VERSION + "\" }") - def apply(name: String): AkkaApplication = new AkkaApplication(name, fromProperties orElse fromClasspath orElse fromHome getOrElse emptyConfig) - - def apply(): AkkaApplication = apply("default") + val defaultConfig = fromProperties orElse fromClasspath orElse fromHome getOrElse emptyConfig + + def apply(name: String): AkkaApplication = new AkkaApplication(name) + + def apply(): AkkaApplication = new AkkaApplication() } class AkkaApplication(val name: String, val config: Configuration) extends ActorRefFactory { + def this(name: String) = this(name, AkkaApplication.defaultConfig) + def this() = this("default") + import AkkaApplication._ object AkkaConfig { @@ -111,7 +116,7 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor // TODO correctly pull its config from the config val dispatcherFactory = new Dispatchers(this) - + implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher // TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor @@ -125,11 +130,11 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor // TODO check memory consistency issues val reflective = new ReflectiveAccess(this) - + val routing = new Routing(this) - + val serialization = new Serialization(this) - + val startTime = System.currentTimeMillis def uptime = (System.currentTimeMillis - startTime) / 1000 diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 368488c6d1..311130285c 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -142,7 +142,7 @@ object Timeout { } object Actor { - + type Receive = PartialFunction[Any, Unit] } @@ -163,9 +163,9 @@ object Actor { * @author Jonas Bonér */ trait Actor { - + type Receive = Actor.Receive - + /** * Stores the context for this actor, including self, sender, and hotswap. */ @@ -187,7 +187,7 @@ trait Actor { ActorCell.contextStack.set(contextStack.push(null)) context } - + implicit def application = context.application private def config = application.AkkaConfig @@ -208,7 +208,7 @@ trait Actor { } def apply(o: Any): Unit = r(o) } - + object LoggingReceive { def apply(source: AnyRef, r: Receive): Receive = r match { case _: LoggingReceive ⇒ r diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index df6f888ab7..6891895d08 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -129,8 +129,8 @@ object AllForOneStrategy { * withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window */ case class AllForOneStrategy(trapExit: List[Class[_ <: Throwable]], - maxNrOfRetries: Option[Int] = None, - withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy { + maxNrOfRetries: Option[Int] = None, + withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy { def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) @@ -171,8 +171,8 @@ object OneForOneStrategy { * withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window */ case class OneForOneStrategy(trapExit: List[Class[_ <: Throwable]], - maxNrOfRetries: Option[Int] = None, - withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy { + maxNrOfRetries: Option[Int] = None, + withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy { def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) @@ -214,7 +214,7 @@ private[akka] class ActorCell( @volatile var hotswap: Stack[PartialFunction[Any, Unit]]) extends ActorContext { import ActorCell._ - + def provider = application.provider @volatile diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index b7bb45e05a..cf9e70b4b1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -147,7 +147,7 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha * @author Jonas Bonér */ class LocalActorRef private[akka] ( - application: AkkaApplication, + application: AkkaApplication, private[this] val props: Props, val address: String, val systemService: Boolean = false, @@ -263,8 +263,8 @@ object RemoteActorSystemMessage { * @author Jonas Bonér */ private[akka] case class RemoteActorRef private[akka] ( - val application: AkkaApplication, - val remote: RemoteSupport, + val application: AkkaApplication, + val remote: RemoteSupport, val remoteAddress: InetSocketAddress, val address: String, loader: Option[ClassLoader]) @@ -386,7 +386,7 @@ case class SerializedActorRef(uuid: Uuid, hostname: String, port: Int) { import akka.serialization.Serialization._ - + @throws(classOf[java.io.ObjectStreamException]) def readResolve(): AnyRef = application.value.registry.local.actorFor(uuid) match { case Some(actor) ⇒ actor diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 2fc2cc6e2b..502b45c8db 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -27,7 +27,7 @@ trait ActorDeployer { * @author Jonas Bonér */ class Deployer(val application: AkkaApplication) extends ActorDeployer { - + val deploymentConfig = new DeploymentConfig(application) // val defaultAddress = Node(Config.nodename) diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index 4d9f27b78b..39c96637d7 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -8,7 +8,7 @@ import akka.routing.{ RouterType, FailureDetectorType } import akka.AkkaApplication object DeploymentConfig { - + // -------------------------------- // --- Deploy // -------------------------------- @@ -239,7 +239,7 @@ object DeploymentConfig { * @author Jonas Bonér */ class DeploymentConfig(val application: AkkaApplication) { - + import DeploymentConfig._ case class ClusterScope( diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index c3a8ec5920..da95c8745a 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -567,7 +567,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ import FSM._ def logDepth: Int = 0 - + private val debugEvent = context.application.AkkaConfig.FsmDebugEvent private val events = new Array[Event](logDepth) diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 99173e4735..18f7732644 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -91,6 +91,12 @@ case class Props(creator: () ⇒ Actor = Props.defaultCreator, */ def withCreator(c: Creator[Actor]) = copy(creator = () ⇒ c.create) + /** + * Returns a new Props with the specified creator set + * Java API + */ + def withCreator(c: Class[_ <: Actor]) = copy(creator = () ⇒ c.newInstance) + /** * Returns a new Props with the specified dispatcher set * Java API diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 8790fd18bc..e7fd9127a7 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -222,8 +222,8 @@ class TypedActor(val application: AkkaApplication) { //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling val actorVar = new AtomVar[ActorRef](null) val timeout = props.timeout match { - case Timeout(Duration.MinusInf) => application.AkkaConfig.TIMEOUT - case x => x + case Timeout(Duration.MinusInf) ⇒ application.AkkaConfig.TIMEOUT + case x ⇒ x } val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar)(timeout)).asInstanceOf[T] proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index cbc891aed2..29bdab8eb2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -98,7 +98,7 @@ class Dispatchers(val application: AkkaApplication) { */ def newDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(name, application.AkkaConfig.DispatcherThroughput, - THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig()) + THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. @@ -125,7 +125,7 @@ class Dispatchers(val application: AkkaApplication) { */ def newBalancingDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new BalancingDispatcher(name, application.AkkaConfig.DispatcherThroughput, - THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig()) + THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 9e8a0c0331..bad2d0c294 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -188,7 +188,7 @@ object Future { */ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], timeout: Timeout, dispatcher: MessageDispatcher): Future[M[A]] = in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) - + def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Timeout)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] = sequence(in)(cbf, timeout, dispatcher) @@ -206,7 +206,7 @@ object Future { def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] = firstCompletedOf(futures)(dispatcher, timeout) - + /** * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ @@ -227,9 +227,9 @@ object Future { } } - def find[T](futures: Iterable[Future[T]], timeout:Timeout)(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = + def find[T](futures: Iterable[Future[T]], timeout: Timeout)(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = find(futures)(predicate)(dispatcher, timeout) - + /** * A non-blocking fold over the specified futures. * The fold is performed on the thread where the last future is completed, @@ -281,7 +281,7 @@ object Future { result } } - + def fold[T, R](futures: Iterable[Future[T]], timeout: Timeout)(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] = fold(futures)(zero)(foldFun)(dispatcher, timeout) @@ -310,7 +310,7 @@ object Future { result } } - + def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout)(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher): Future[R] = reduce(futures)(op)(dispatcher, timeout) @@ -327,7 +327,7 @@ object Future { val fb = fn(a.asInstanceOf[A]) for (r ← fr; b ← fb) yield (r += b) }.map(_.result) - + def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] = traverse(in)(fn)(cbf, timeout, dispatcher) @@ -356,8 +356,8 @@ object Future { }, true) future } - - // TODO make variant of flow(timeout)(body) which does NOT break type inference + + // TODO make variant of flow(timeout)(body) which does NOT break type inference private val _taskStack = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() { override def initialValue = None diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index a7bc7451cd..00867ffb0c 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -54,10 +54,10 @@ import akka.AkkaApplication * @author Jonas Bonér */ object EventHandler extends ListenerManagement { - + // TODO remove this EVIL thing! private val appl = AkkaApplication("akka-reference.conf") - + val synchronousLogging: Boolean = System.getProperty("akka.event.force-sync") match { case null | "" ⇒ false case _ ⇒ true diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index 2c1831a3cd..84c836d192 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -221,7 +221,7 @@ abstract class RemoteSupport(val application: AkkaApplication) extends ListenerM /** * This is the interface for the RemoteServer functionality, it's used in Actor.remote */ -trait RemoteServerModule extends RemoteModule { this: RemoteSupport => +trait RemoteServerModule extends RemoteModule { this: RemoteSupport ⇒ protected val guard = new ReentrantGuard /** diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index ae7ade0796..2a0dc2a7b3 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -126,7 +126,7 @@ class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector } object Routing { - + sealed trait RoutingMessage case class Broadcast(message: Any) extends RoutingMessage diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index f32d220ec3..3f7d1c3750 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -16,7 +16,7 @@ import java.net.InetSocketAddress import akka.AkkaApplication object ReflectiveAccess { - + val loader = getClass.getClassLoader val emptyParams: Array[Class[_]] = Array() val emptyArguments: Array[AnyRef] = Array() @@ -124,7 +124,7 @@ object ReflectiveAccess { * @author Jonas Bonér */ class ReflectiveAccess(val application: AkkaApplication) { - + import ReflectiveAccess._ /** diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index c279cba8e5..6e5bcdfb6b 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -4,11 +4,11 @@ package akka.agent +import akka.AkkaApplication +import akka.actor._ import akka.stm._ import akka.japi.{ Function ⇒ JFunc, Procedure ⇒ JProc } -import akka.dispatch.{ PinnedDispatcher, DefaultPromise, Dispatchers, Future } -import akka.actor.{ Props, LocalActorRef, Actor } -import akka.actor.Actor._ +import akka.dispatch.{ PinnedDispatcher, UnboundedMailbox, DefaultPromise, Dispatchers, Future } /** * Used internally to send functions. @@ -20,7 +20,7 @@ private[akka] case object Get * Factory method for creating an Agent. */ object Agent { - def apply[T](initialValue: T) = new Agent(initialValue) + def apply[T](initialValue: T)(implicit application: AkkaApplication) = new Agent(initialValue, application) } /** @@ -93,9 +93,9 @@ object Agent { * agent4.close * }}} */ -class Agent[T](initialValue: T) { +class Agent[T](initialValue: T, application: AkkaApplication) { private[akka] val ref = Ref(initialValue) - private[akka] val updater = actorOf(Props(new AgentUpdater(this))).asInstanceOf[LocalActorRef] //TODO can we avoid this somehow? + private[akka] val updater = application.createActor(Props(new AgentUpdater(this))).asInstanceOf[LocalActorRef] //TODO can we avoid this somehow? /** * Read the internal state of the agent. @@ -117,13 +117,13 @@ class Agent[T](initialValue: T) { } /** - * Dispatch a function to update the internal state, and return a Future where that new state can be obtained - * within the given timeout + * Dispatch a function to update the internal state, and return a Future where + * that new state can be obtained within the given timeout. */ - def alter(f: T ⇒ T)(timeout: Long): Future[T] = { + def alter(f: T ⇒ T)(timeout: Timeout): Future[T] = { def dispatch = updater.?(Update(f), timeout).asInstanceOf[Future[T]] if (Stm.activeTransaction) { - val result = new DefaultPromise[T](timeout) + val result = new DefaultPromise[T](timeout)(application.dispatcher) get //Join xa deferred { result completeWith dispatch @@ -150,12 +150,15 @@ class Agent[T](initialValue: T) { * or blocking operations. Dispatches using either `sendOff` or `send` will * still be executed in order. */ - def sendOff(f: T ⇒ T): Unit = send((value: T) ⇒ { - suspend() - val threadBased = actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(new PinnedDispatcher())) - threadBased ! Update(f) - value - }) + def sendOff(f: T ⇒ T): Unit = { + send((value: T) ⇒ { + suspend() + val pinnedDispatcher = new PinnedDispatcher(null, "agent-send-off", UnboundedMailbox(), application.AkkaConfig.TimeoutMillis) + val threadBased = application.createActor(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher)) + threadBased ! Update(f) + value + }) + } /** * Dispatch a function to update the internal state but on its own thread, @@ -164,11 +167,12 @@ class Agent[T](initialValue: T) { * or blocking operations. Dispatches using either `alterOff` or `alter` will * still be executed in order. */ - def alterOff(f: T ⇒ T)(timeout: Long): Future[T] = { - val result = new DefaultPromise[T](timeout) + def alterOff(f: T ⇒ T)(timeout: Timeout): Future[T] = { + val result = new DefaultPromise[T](timeout)(application.dispatcher) send((value: T) ⇒ { suspend() - val threadBased = Actor.actorOf(new ThreadBasedAgentUpdater(this)) + val pinnedDispatcher = new PinnedDispatcher(null, "agent-alter-off", UnboundedMailbox(), application.AkkaConfig.TimeoutMillis) + val threadBased = application.createActor(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher)) result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]] value }) @@ -179,18 +183,18 @@ class Agent[T](initialValue: T) { * A future to the current value that will be completed after any currently * queued updates. */ - def future(): Future[T] = (updater ? Get).asInstanceOf[Future[T]] + def future(implicit timeout: Timeout): Future[T] = (updater ? Get).asInstanceOf[Future[T]] /** * Gets this agent's value after all currently queued updates have completed. */ - def await(): T = future.await.result.get + def await(implicit timeout: Timeout): T = future.await.result.get /** * Map this agent to a new agent, applying the function to the internal state. * Does not change the value of this agent. */ - def map[B](f: T ⇒ B): Agent[B] = Agent(f(get)) + def map[B](f: T ⇒ B): Agent[B] = Agent(f(get))(application) /** * Flatmap this agent to a new agent, applying the function to the internal state. @@ -260,7 +264,7 @@ class Agent[T](initialValue: T) { * Map this agent to a new agent, applying the function to the internal state. * Does not change the value of this agent. */ - def map[B](f: JFunc[T, B]): Agent[B] = Agent(f(get)) + def map[B](f: JFunc[T, B]): Agent[B] = Agent(f(get))(application) /** * Java API: diff --git a/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala b/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala index 76364bf160..d156ea5e3a 100644 --- a/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala +++ b/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala @@ -6,47 +6,32 @@ package akka.stm import java.lang.{ Boolean ⇒ JBoolean } -import akka.config.Config._ import akka.util.Duration import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance import org.multiverse.stms.alpha.AlphaStm import org.multiverse.templates.TransactionBoilerplate -import org.multiverse.api.{ PropagationLevel ⇒ MPropagation } +import org.multiverse.api.PropagationLevel import org.multiverse.api.{ TraceLevel ⇒ MTraceLevel } /** * For configuring multiverse transactions. */ object TransactionConfig { - // note: null values are so that we can default to Multiverse inference when not set - val FAMILY_NAME = "DefaultTransaction" - val READONLY = null.asInstanceOf[JBoolean] - val MAX_RETRIES = config.getInt("akka.stm.max-retries", 1000) - val TIMEOUT = config.getLong("akka.stm.timeout", 5) - val TRACK_READS = null.asInstanceOf[JBoolean] - val WRITE_SKEW = config.getBool("akka.stm.write-skew", true) - val BLOCKING_ALLOWED = config.getBool("akka.stm.blocking-allowed", false) - val INTERRUPTIBLE = config.getBool("akka.stm.interruptible", false) - val SPECULATIVE = config.getBool("akka.stm.speculative", true) - val QUICK_RELEASE = config.getBool("akka.stm.quick-release", true) - val PROPAGATION = propagation(config.getString("akka.stm.propagation", "requires")) - val TRACE_LEVEL = traceLevel(config.getString("akka.stm.trace-level", "none")) - - val DefaultTimeout = Duration(TIMEOUT, TIME_UNIT) - - def propagation(level: String) = level.toLowerCase match { - case "requiresnew" ⇒ Propagation.RequiresNew - case "fine" ⇒ Propagation.Mandatory - case "supports" ⇒ Propagation.Supports - case "never" ⇒ Propagation.Never - case _ ⇒ Propagation.Requires - } - - def traceLevel(level: String) = level.toLowerCase match { - case "coarse" | "course" ⇒ TraceLevel.Coarse - case "fine" ⇒ TraceLevel.Fine - case _ ⇒ TraceLevel.None + object Default { + // note: null values are so that we can default to Multiverse inference when not set + val FamilyName = "DefaultTransaction" + val Readonly = null.asInstanceOf[JBoolean] + val MaxRetries = 1000 + val Timeout = Duration(5, "seconds") + val TrackReads = null.asInstanceOf[JBoolean] + val WriteSkew = true + val BlockingAllowed = false + val Interruptible = false + val Speculative = true + val QuickRelease = true + val Propagation = PropagationLevel.Requires + val TraceLevel = MTraceLevel.none } /** @@ -65,18 +50,18 @@ object TransactionConfig { * @param propagation For controlling how nested transactions behave. * @param traceLevel Transaction trace level. */ - def apply(familyName: String = FAMILY_NAME, - readonly: JBoolean = READONLY, - maxRetries: Int = MAX_RETRIES, - timeout: Duration = DefaultTimeout, - trackReads: JBoolean = TRACK_READS, - writeSkew: Boolean = WRITE_SKEW, - blockingAllowed: Boolean = BLOCKING_ALLOWED, - interruptible: Boolean = INTERRUPTIBLE, - speculative: Boolean = SPECULATIVE, - quickRelease: Boolean = QUICK_RELEASE, - propagation: MPropagation = PROPAGATION, - traceLevel: MTraceLevel = TRACE_LEVEL) = { + def apply(familyName: String = Default.FamilyName, + readonly: JBoolean = Default.Readonly, + maxRetries: Int = Default.MaxRetries, + timeout: Duration = Default.Timeout, + trackReads: JBoolean = Default.TrackReads, + writeSkew: Boolean = Default.WriteSkew, + blockingAllowed: Boolean = Default.BlockingAllowed, + interruptible: Boolean = Default.Interruptible, + speculative: Boolean = Default.Speculative, + quickRelease: Boolean = Default.QuickRelease, + propagation: PropagationLevel = Default.Propagation, + traceLevel: MTraceLevel = Default.TraceLevel) = { new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, interruptible, speculative, quickRelease, propagation, traceLevel) } @@ -98,18 +83,18 @@ object TransactionConfig { *
propagation - For controlling how nested transactions behave. *
traceLevel - Transaction trace level.
*/
-class TransactionConfig(val familyName: String = TransactionConfig.FAMILY_NAME,
- val readonly: JBoolean = TransactionConfig.READONLY,
- val maxRetries: Int = TransactionConfig.MAX_RETRIES,
- val timeout: Duration = TransactionConfig.DefaultTimeout,
- val trackReads: JBoolean = TransactionConfig.TRACK_READS,
- val writeSkew: Boolean = TransactionConfig.WRITE_SKEW,
- val blockingAllowed: Boolean = TransactionConfig.BLOCKING_ALLOWED,
- val interruptible: Boolean = TransactionConfig.INTERRUPTIBLE,
- val speculative: Boolean = TransactionConfig.SPECULATIVE,
- val quickRelease: Boolean = TransactionConfig.QUICK_RELEASE,
- val propagation: MPropagation = TransactionConfig.PROPAGATION,
- val traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL)
+class TransactionConfig(val familyName: String = TransactionConfig.Default.FamilyName,
+ val readonly: JBoolean = TransactionConfig.Default.Readonly,
+ val maxRetries: Int = TransactionConfig.Default.MaxRetries,
+ val timeout: Duration = TransactionConfig.Default.Timeout,
+ val trackReads: JBoolean = TransactionConfig.Default.TrackReads,
+ val writeSkew: Boolean = TransactionConfig.Default.WriteSkew,
+ val blockingAllowed: Boolean = TransactionConfig.Default.BlockingAllowed,
+ val interruptible: Boolean = TransactionConfig.Default.Interruptible,
+ val speculative: Boolean = TransactionConfig.Default.Speculative,
+ val quickRelease: Boolean = TransactionConfig.Default.QuickRelease,
+ val propagation: PropagationLevel = TransactionConfig.Default.Propagation,
+ val traceLevel: MTraceLevel = TransactionConfig.Default.TraceLevel)
object DefaultTransactionConfig extends TransactionConfig
@@ -121,18 +106,18 @@ object TransactionFactory {
def apply(config: TransactionConfig, defaultName: String) = new TransactionFactory(config, defaultName)
- def apply(familyName: String = TransactionConfig.FAMILY_NAME,
- readonly: JBoolean = TransactionConfig.READONLY,
- maxRetries: Int = TransactionConfig.MAX_RETRIES,
- timeout: Duration = TransactionConfig.DefaultTimeout,
- trackReads: JBoolean = TransactionConfig.TRACK_READS,
- writeSkew: Boolean = TransactionConfig.WRITE_SKEW,
- blockingAllowed: Boolean = TransactionConfig.BLOCKING_ALLOWED,
- interruptible: Boolean = TransactionConfig.INTERRUPTIBLE,
- speculative: Boolean = TransactionConfig.SPECULATIVE,
- quickRelease: Boolean = TransactionConfig.QUICK_RELEASE,
- propagation: MPropagation = TransactionConfig.PROPAGATION,
- traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL) = {
+ def apply(familyName: String = TransactionConfig.Default.FamilyName,
+ readonly: JBoolean = TransactionConfig.Default.Readonly,
+ maxRetries: Int = TransactionConfig.Default.MaxRetries,
+ timeout: Duration = TransactionConfig.Default.Timeout,
+ trackReads: JBoolean = TransactionConfig.Default.TrackReads,
+ writeSkew: Boolean = TransactionConfig.Default.WriteSkew,
+ blockingAllowed: Boolean = TransactionConfig.Default.BlockingAllowed,
+ interruptible: Boolean = TransactionConfig.Default.Interruptible,
+ speculative: Boolean = TransactionConfig.Default.Speculative,
+ quickRelease: Boolean = TransactionConfig.Default.QuickRelease,
+ propagation: PropagationLevel = TransactionConfig.Default.Propagation,
+ traceLevel: MTraceLevel = TransactionConfig.Default.TraceLevel) = {
val config = new TransactionConfig(
familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed,
interruptible, speculative, quickRelease, propagation, traceLevel)
@@ -163,10 +148,10 @@ object TransactionFactory {
*/
class TransactionFactory(
val config: TransactionConfig = DefaultTransactionConfig,
- defaultName: String = TransactionConfig.FAMILY_NAME) { self ⇒
+ defaultName: String = TransactionConfig.Default.FamilyName) { self ⇒
// use the config family name if it's been set, otherwise defaultName - used by actors to set class name as default
- val familyName = if (config.familyName != TransactionConfig.FAMILY_NAME) config.familyName else defaultName
+ val familyName = if (config.familyName != TransactionConfig.Default.FamilyName) config.familyName else defaultName
val factory = {
var builder = (getGlobalStmInstance().asInstanceOf[AlphaStm].getTransactionFactoryBuilder()
@@ -199,11 +184,11 @@ class TransactionFactory(
* Mapping to Multiverse PropagationLevel.
*/
object Propagation {
- val RequiresNew = MPropagation.RequiresNew
- val Mandatory = MPropagation.Mandatory
- val Requires = MPropagation.Requires
- val Supports = MPropagation.Supports
- val Never = MPropagation.Never
+ val RequiresNew = PropagationLevel.RequiresNew
+ val Mandatory = PropagationLevel.Mandatory
+ val Requires = PropagationLevel.Requires
+ val Supports = PropagationLevel.Supports
+ val Never = PropagationLevel.Never
}
/**
diff --git a/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala b/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala
index e975d284f8..7b6203f07b 100644
--- a/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala
+++ b/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala
@@ -9,24 +9,24 @@ import java.lang.{ Boolean ⇒ JBoolean }
import akka.util.Duration
import org.multiverse.api.{ TraceLevel ⇒ MTraceLevel }
-import org.multiverse.api.{ PropagationLevel ⇒ MPropagation }
+import org.multiverse.api.PropagationLevel
/**
* For more easily creating TransactionConfig from Java.
*/
class TransactionConfigBuilder {
- var familyName: String = TransactionConfig.FAMILY_NAME
- var readonly: JBoolean = TransactionConfig.READONLY
- var maxRetries: Int = TransactionConfig.MAX_RETRIES
- var timeout: Duration = TransactionConfig.DefaultTimeout
- var trackReads: JBoolean = TransactionConfig.TRACK_READS
- var writeSkew: Boolean = TransactionConfig.WRITE_SKEW
- var blockingAllowed: Boolean = TransactionConfig.BLOCKING_ALLOWED
- var interruptible: Boolean = TransactionConfig.INTERRUPTIBLE
- var speculative: Boolean = TransactionConfig.SPECULATIVE
- var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE
- var propagation: MPropagation = TransactionConfig.PROPAGATION
- var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL
+ var familyName: String = TransactionConfig.Default.FamilyName
+ var readonly: JBoolean = TransactionConfig.Default.Readonly
+ var maxRetries: Int = TransactionConfig.Default.MaxRetries
+ var timeout: Duration = TransactionConfig.Default.Timeout
+ var trackReads: JBoolean = TransactionConfig.Default.TrackReads
+ var writeSkew: Boolean = TransactionConfig.Default.WriteSkew
+ var blockingAllowed: Boolean = TransactionConfig.Default.BlockingAllowed
+ var interruptible: Boolean = TransactionConfig.Default.Interruptible
+ var speculative: Boolean = TransactionConfig.Default.Speculative
+ var quickRelease: Boolean = TransactionConfig.Default.QuickRelease
+ var propagation: PropagationLevel = TransactionConfig.Default.Propagation
+ var traceLevel: MTraceLevel = TransactionConfig.Default.TraceLevel
def setFamilyName(familyName: String) = { this.familyName = familyName; this }
def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this }
@@ -38,7 +38,7 @@ class TransactionConfigBuilder {
def setInterruptible(interruptible: Boolean) = { this.interruptible = interruptible; this }
def setSpeculative(speculative: Boolean) = { this.speculative = speculative; this }
def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this }
- def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this }
+ def setPropagation(propagation: PropagationLevel) = { this.propagation = propagation; this }
def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this }
def build() = new TransactionConfig(
@@ -50,18 +50,18 @@ class TransactionConfigBuilder {
* For more easily creating TransactionFactory from Java.
*/
class TransactionFactoryBuilder {
- var familyName: String = TransactionConfig.FAMILY_NAME
- var readonly: JBoolean = TransactionConfig.READONLY
- var maxRetries: Int = TransactionConfig.MAX_RETRIES
- var timeout: Duration = TransactionConfig.DefaultTimeout
- var trackReads: JBoolean = TransactionConfig.TRACK_READS
- var writeSkew: Boolean = TransactionConfig.WRITE_SKEW
- var blockingAllowed: Boolean = TransactionConfig.BLOCKING_ALLOWED
- var interruptible: Boolean = TransactionConfig.INTERRUPTIBLE
- var speculative: Boolean = TransactionConfig.SPECULATIVE
- var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE
- var propagation: MPropagation = TransactionConfig.PROPAGATION
- var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL
+ var familyName: String = TransactionConfig.Default.FamilyName
+ var readonly: JBoolean = TransactionConfig.Default.Readonly
+ var maxRetries: Int = TransactionConfig.Default.MaxRetries
+ var timeout: Duration = TransactionConfig.Default.Timeout
+ var trackReads: JBoolean = TransactionConfig.Default.TrackReads
+ var writeSkew: Boolean = TransactionConfig.Default.WriteSkew
+ var blockingAllowed: Boolean = TransactionConfig.Default.BlockingAllowed
+ var interruptible: Boolean = TransactionConfig.Default.Interruptible
+ var speculative: Boolean = TransactionConfig.Default.Speculative
+ var quickRelease: Boolean = TransactionConfig.Default.QuickRelease
+ var propagation: PropagationLevel = TransactionConfig.Default.Propagation
+ var traceLevel: MTraceLevel = TransactionConfig.Default.TraceLevel
def setFamilyName(familyName: String) = { this.familyName = familyName; this }
def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this }
@@ -73,7 +73,7 @@ class TransactionFactoryBuilder {
def setInterruptible(interruptible: Boolean) = { this.interruptible = interruptible; this }
def setSpeculative(speculative: Boolean) = { this.speculative = speculative; this }
def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this }
- def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this }
+ def setPropagation(propagation: PropagationLevel) = { this.propagation = propagation; this }
def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this }
def build() = {
diff --git a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala
index f52732294d..a7a30e7cee 100644
--- a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala
+++ b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala
@@ -5,7 +5,6 @@
package akka.transactor
import akka.AkkaException
-import akka.config.Config
import akka.stm.{ Atomic, DefaultTransactionConfig, TransactionFactory }
import org.multiverse.commitbarriers.CountDownCommitBarrier
@@ -26,13 +25,12 @@ class CoordinatedTransactionException(message: String, cause: Throwable = null)
*/
object Coordinated {
val DefaultFactory = TransactionFactory(DefaultTransactionConfig, "DefaultCoordinatedTransaction")
- val Fair = Config.config.getBool("akka.stm.fair", true)
def apply(message: Any = null) = new Coordinated(message, createBarrier)
def unapply(c: Coordinated): Option[Any] = Some(c.message)
- def createBarrier = new CountDownCommitBarrier(1, Fair)
+ def createBarrier = new CountDownCommitBarrier(1, true)
}
/**
diff --git a/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java b/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java
index b3c386fb09..efc585539b 100644
--- a/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java
+++ b/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java
@@ -1,5 +1,6 @@
package akka.stm.example;
+import akka.AkkaApplication;
import akka.stm.*;
import akka.actor.*;
@@ -9,10 +10,12 @@ public class EitherOrElseExample {
System.out.println("EitherOrElse example");
System.out.println();
+ AkkaApplication application = new AkkaApplication("UntypedTransactorExample");
+
final Ref