diff --git a/akka-actors/src/main/scala/actor/ActiveObject.scala b/akka-actors/src/main/scala/actor/ActiveObject.scala index f6b45bcf53..d3584ed93b 100644 --- a/akka-actors/src/main/scala/actor/ActiveObject.scala +++ b/akka-actors/src/main/scala/actor/ActiveObject.scala @@ -37,73 +37,145 @@ object ActiveObject { val AKKA_CAMEL_ROUTING_SCHEME = "akka" def newInstance[T](target: Class[T], timeout: Long): T = - newInstance(target, new Dispatcher(None), None, timeout) + newInstance(target, new Dispatcher(false, None), None, timeout) def newInstance[T](target: Class[T], timeout: Long, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(target, new Dispatcher(restartCallbacks), None, timeout) + newInstance(target, new Dispatcher(false, restartCallbacks), None, timeout) def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = - newInstance(intf, target, new Dispatcher(None), None, timeout) + newInstance(intf, target, new Dispatcher(false, None), None, timeout) def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(intf, target, new Dispatcher(restartCallbacks), None, timeout) + newInstance(intf, target, new Dispatcher(false, restartCallbacks), None, timeout) + + def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean): T = + newInstance(target, new Dispatcher(transactionRequired, None), None, timeout) + + def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, restartCallbacks: Option[RestartCallbacks]): T = + newInstance(target, new Dispatcher(transactionRequired, restartCallbacks), None, timeout) + + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean): T = + newInstance(intf, target, new Dispatcher(transactionRequired, None), None, timeout) + + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, restartCallbacks: Option[RestartCallbacks]): T = + newInstance(intf, target, new Dispatcher(transactionRequired, restartCallbacks), None, timeout) def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T = - newInstance(target, new Dispatcher(None), Some(new InetSocketAddress(hostname, port)), timeout) + newInstance(target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout) def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(target, new Dispatcher(restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) + newInstance(target, new Dispatcher(false, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T = - newInstance(intf, target, new Dispatcher(None), Some(new InetSocketAddress(hostname, port)), timeout) + newInstance(intf, target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout) def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(intf, target, new Dispatcher(restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) + newInstance(intf, target, new Dispatcher(false, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) + + def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, hostname: String, port: Int): T = + newInstance(target, new Dispatcher(transactionRequired, None), Some(new InetSocketAddress(hostname, port)), timeout) + + def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = + newInstance(target, new Dispatcher(transactionRequired, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) + + def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, hostname: String, port: Int): T = + newInstance(intf, target, new Dispatcher(transactionRequired, None), Some(new InetSocketAddress(hostname, port)), timeout) + + def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = + newInstance(intf, target, new Dispatcher(transactionRequired, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher(None) + val actor = new Dispatcher(false, None) actor.messageDispatcher = dispatcher newInstance(target, actor, None, timeout) } def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(restartCallbacks) + val actor = new Dispatcher(false, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(target, actor, None, timeout) } def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher(None) + val actor = new Dispatcher(false, None) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, None, timeout) } def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(restartCallbacks) + val actor = new Dispatcher(false, restartCallbacks) + actor.messageDispatcher = dispatcher + newInstance(intf, target, actor, None, timeout) + } + + def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher): T = { + val actor = new Dispatcher(transactionRequired, None) + actor.messageDispatcher = dispatcher + newInstance(target, actor, None, timeout) + } + + def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { + val actor = new Dispatcher(transactionRequired, restartCallbacks) + actor.messageDispatcher = dispatcher + newInstance(target, actor, None, timeout) + } + + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher): T = { + val actor = new Dispatcher(transactionRequired, None) + actor.messageDispatcher = dispatcher + newInstance(intf, target, actor, None, timeout) + } + + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { + val actor = new Dispatcher(transactionRequired, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, None, timeout) } def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher(None) + val actor = new Dispatcher(false, None) actor.messageDispatcher = dispatcher newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(restartCallbacks) + val actor = new Dispatcher(false, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher(None) + val actor = new Dispatcher(false, None) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(restartCallbacks) + val actor = new Dispatcher(false, restartCallbacks) + actor.messageDispatcher = dispatcher + newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) + } + + def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { + val actor = new Dispatcher(transactionRequired, None) + actor.messageDispatcher = dispatcher + newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) + } + + def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { + val actor = new Dispatcher(transactionRequired, restartCallbacks) + actor.messageDispatcher = dispatcher + newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) + } + + def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { + val actor = new Dispatcher(transactionRequired, None) + actor.messageDispatcher = dispatcher + newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) + } + + def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { + val actor = new Dispatcher(transactionRequired, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } @@ -282,7 +354,7 @@ private[akka] sealed class ActiveObjectAspect { * * @author Jonas Bonér */ -private[akka] class Dispatcher(val callbacks: Option[RestartCallbacks]) extends Actor { +private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Option[RestartCallbacks]) extends Actor { private val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]() private val ZERO_ITEM_OBJECT_ARRAY = Array[Object[_]]() @@ -292,7 +364,7 @@ private[akka] class Dispatcher(val callbacks: Option[RestartCallbacks]) extends private var initTxState: Option[Method] = None private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef) = { - if (targetClass.isAnnotationPresent(Annotations.transactionrequired)) makeTransactionRequired + if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired)) makeTransactionRequired id = targetClass.getName target = Some(targetInstance) val methods = targetInstance.getClass.getDeclaredMethods.toList diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index 85d39a97d7..ed021c1e16 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -66,7 +66,7 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker { /** * @author Jonas Bonér */ -object Actor { +object Actor extends Logging { val TIMEOUT = config.getInt("akka.actor.timeout", 5000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) @@ -74,7 +74,7 @@ object Actor { implicit val Self: AnyRef = this def receive = { case unknown => - log.error( + Actor.log.error( "Actor.Sender can't process messages. Received message [%s]." + "This error could occur if you either:" + "\n\t- Explicitly send a message to the Actor.Sender object." + @@ -115,7 +115,7 @@ object Actor { * * */ - def actor[A](body: => Unit) = { + def actor[A](body: => Unit) = { def handler[A](body: Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = new Actor() { start @@ -129,7 +129,7 @@ object Actor { /** * Use to create an anonymous event-driven actor with a body but no message loop block. *
- * This actor can not respond to any messages but can be used as a simple way to + * This actor can not respond to any messages but can be used as a simple way to * spawn a lightweight thread to process some task. * * The actor is started when created. @@ -163,7 +163,7 @@ object Actor { * */ def actor(lifeCycleConfig: LifeCycle)(body: PartialFunction[Any, Unit]): Actor = new Actor() { - lifeCycle = lifeCycleConfig + lifeCycle = Some(lifeCycleConfig) start def receive = body } @@ -183,10 +183,10 @@ object Actor { * } * */ - def actor[A](lifeCycleConfig: LifeCycle)(body: => Unit) = { + def actor[A](lifeCycleConfig: LifeCycle)(body: => Unit) = { def handler[A](body: Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = new Actor() { - lifeCycle = lifeCycleConfig + lifeCycle = Some(lifeCycleConfig) start body def receive = handler @@ -209,15 +209,15 @@ object Actor { * * @author Jonas Bonér */ -trait Actor extends Logging with TransactionManagement { +trait Actor extends TransactionManagement { ActorRegistry.register(this) implicit protected val self: Actor = this - + // FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait private[akka] var _uuid = Uuid.newUuid.toString def uuid = _uuid - + // ==================================== // private fields // ==================================== @@ -226,9 +226,9 @@ trait Actor extends Logging with TransactionManagement { @volatile private var _isShutDown: Boolean = false private var _hotswap: Option[PartialFunction[Any, Unit]] = None private var _config: Option[AnyRef] = None - private val _remoteFlagLock = new ReadWriteLock + private val _remoteFlagLock = new ReadWriteLock private[akka] var _remoteAddress: Option[InetSocketAddress] = None - private[akka] val _linkedActors = new HashSet[Actor] + private[akka] var _linkedActors: Option[HashSet[Actor]] = None private[akka] var _mailbox: MessageQueue = _ private[akka] var _supervisor: Option[Actor] = None @@ -244,7 +244,7 @@ trait Actor extends Logging with TransactionManagement { * * This sender reference can be used together with the '!' method for request/reply * message exchanges and which is in many ways better than using the '!!' method - * which will make the sender wait for a reply using a *blocking* future. + * which will make the sender wait for a reply using a *blocking* future. */ protected[this] var sender: Option[Actor] = None @@ -284,7 +284,7 @@ trait Actor extends Logging with TransactionManagement { * * You can override it so it fits the specific use-case that the actor is used for. * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different - * dispatchers available. + * dispatchers available. * * The default is also that all actors that are created and spawned from within this actor * is sharing the same dispatcher as its creator. @@ -301,7 +301,7 @@ trait Actor extends Logging with TransactionManagement { * Set trapExit to the list of exception classes that the actor should be able to trap * from the actor it is supervising. When the supervising actor throws these exceptions * then they will trigger a restart. - * + * *
* // trap all exceptions
* trapExit = List(classOf[Throwable])
@@ -328,9 +328,9 @@ trait Actor extends Logging with TransactionManagement {
/**
* User overridable callback/setting.
*
- * Defines the life-cycle for a supervised actor. Default is 'LifeCycle(Permanent)' but can be overridden.
+ * Defines the life-cycle for a supervised actor.
*/
- @volatile var lifeCycle: LifeCycle = LifeCycle(Permanent)
+ @volatile var lifeCycle: Option[LifeCycle] = None
/**
* User overridable callback/setting.
@@ -411,14 +411,14 @@ trait Actor extends Logging with TransactionManagement {
* Starts up the actor and its message queue.
*/
def start: Actor = synchronized {
- if (_isShutDown) throw new IllegalStateException("Can't restart an actor that have been shut down with 'exit'")
+ if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'")
if (!_isRunning) {
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
messageDispatcher.start
_isRunning = true
//if (isTransactional) this !! TransactionalInit
}
- log.info("[%s] has started", toString)
+ Actor.log.info("[%s] has started", toString)
this
}
@@ -438,6 +438,7 @@ trait Actor extends Logging with TransactionManagement {
_isRunning = false
_isShutDown = true
shutdown
+ ActorRegistry.unregister(this)
}
}
@@ -447,7 +448,7 @@ trait Actor extends Logging with TransactionManagement {
*
* If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
*
- *
+ *
* This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable.
*
* actor ! message
@@ -513,7 +514,7 @@ trait Actor extends Logging with TransactionManagement {
getResultOrThrowException(future)
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
-
+
/**
* Sends a message asynchronously and waits on a future for a reply message.
*
@@ -529,10 +530,10 @@ trait Actor extends Logging with TransactionManagement {
def !: Option[T] = !
/**
- * This method is evil and have been removed. Use '!!' with a timeout instead.
+ * This method is evil and has been removed. Use '!!' with a timeout instead.
*/
def !?[T](message: AnyRef): T = throw new UnsupportedOperationException(
- "'!?' is evil and have been removed. Use '!!' with a timeout instead")
+ "'!?' is evil and has been removed. Use '!!' with a timeout instead")
/**
* Use reply(..) to reply with a message to the original sender of the message currently
@@ -551,7 +552,8 @@ trait Actor extends Logging with TransactionManagement {
"\n\t\t1. Send a message to a remote actor" +
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
- "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future that will be bound by the argument passed to 'reply'." )
+ "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
+ "\n\tthat will be bound by the argument passed to 'reply'." )
case Some(future) =>
future.completeWithResult(message)
}
@@ -574,7 +576,7 @@ trait Actor extends Logging with TransactionManagement {
} else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
-
+
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
@@ -615,11 +617,11 @@ trait Actor extends Logging with TransactionManagement {
*/
protected[this] def link(actor: Actor) = {
if (_isRunning) {
- _linkedActors.add(actor)
+ getLinkedActors.add(actor)
if (actor._supervisor.isDefined) throw new IllegalStateException(
"Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
actor._supervisor = Some(this)
- log.debug("Linking actor [%s] to actor [%s]", actor, this)
+ Actor.log.debug("Linking actor [%s] to actor [%s]", actor, this)
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
@@ -631,10 +633,11 @@ trait Actor extends Logging with TransactionManagement {
*/
protected[this] def unlink(actor: Actor) = {
if (_isRunning) {
- if (!_linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink")
- _linkedActors.remove(actor)
+ if (!getLinkedActors.contains(actor)) throw new IllegalStateException(
+ "Actor [" + actor + "] is not a linked actor, can't unlink")
+ getLinkedActors.remove(actor)
actor._supervisor = None
- log.debug("Unlinking actor [%s] from actor [%s]", actor, this)
+ Actor.log.debug("Unlinking actor [%s] from actor [%s]", actor, this)
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
@@ -741,7 +744,7 @@ trait Actor extends Logging with TransactionManagement {
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long):
CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
if (_remoteAddress.isDefined) {
- val requestBuilder = RemoteRequest.newBuilder
+ val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
.setTimeout(timeout)
@@ -773,7 +776,7 @@ trait Actor extends Logging with TransactionManagement {
else dispatch(messageHandle)
} catch {
case e =>
- log.error(e, "Could not invoke actor [%s]", this)
+ Actor.log.error(e, "Could not invoke actor [%s]", this)
throw e
}
}
@@ -790,7 +793,7 @@ trait Actor extends Logging with TransactionManagement {
else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
} catch {
case e =>
- log.error(e, "Could not invoke actor [%s]", this)
+ Actor.log.error(e, "Could not invoke actor [%s]", this)
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
@@ -801,7 +804,7 @@ trait Actor extends Logging with TransactionManagement {
private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
setTransaction(messageHandle.tx)
-
+
val message = messageHandle.message //serializeMessage(messageHandle.message)
senderFuture = messageHandle.future
sender = messageHandle.sender
@@ -817,7 +820,7 @@ trait Actor extends Logging with TransactionManagement {
decrementTransaction
}
}
-
+
try {
if (isTransactionRequiresNew && !isTransactionInScope) {
if (senderFuture.isEmpty) throw new StmException(
@@ -830,7 +833,7 @@ trait Actor extends Logging with TransactionManagement {
} else proceed
} catch {
case e =>
- log.error(e, "Could not invoke actor [%s]", this)
+ Actor.log.error(e, "Could not invoke actor [%s]", this)
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
clearTransaction // need to clear currentTransaction before call to supervisor
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
@@ -871,15 +874,16 @@ trait Actor extends Logging with TransactionManagement {
}
private[this] def restartLinkedActors(reason: AnyRef) = {
- _linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
- actor.lifeCycle match {
+ getLinkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
+ if (actor.lifeCycle.isEmpty) actor.lifeCycle = Some(LifeCycle(Permanent))
+ actor.lifeCycle.get match {
case LifeCycle(scope, _) => {
scope match {
case Permanent =>
actor.restart(reason)
case Temporary =>
- log.info("Actor [%s] configured as TEMPORARY will not be restarted.", actor.id)
- _linkedActors.remove(actor) // remove the temporary actor
+ Actor.log.info("Actor [%s] configured as TEMPORARY will not be restarted.", actor.id)
+ getLinkedActors.remove(actor) // remove the temporary actor
}
}
}
@@ -888,7 +892,7 @@ trait Actor extends Logging with TransactionManagement {
private[Actor] def restart(reason: AnyRef) = synchronized {
preRestart(reason, _config)
- log.info("Restarting actor [%s] configured as PERMANENT.", id)
+ Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
postRestart(reason, _config)
}
@@ -899,6 +903,13 @@ trait Actor extends Logging with TransactionManagement {
} else None
}
+ protected def getLinkedActors: HashSet[Actor] = {
+ if (_linkedActors.isEmpty) {
+ val set = new HashSet[Actor]
+ _linkedActors = Some(set)
+ set
+ } else _linkedActors.get
+ }
private[akka] def swapDispatcher(disp: MessageDispatcher) = synchronized {
messageDispatcher = disp
diff --git a/akka-actors/src/main/scala/actor/ActorRegistry.scala b/akka-actors/src/main/scala/actor/ActorRegistry.scala
index fc40a9003a..327a7e6395 100755
--- a/akka-actors/src/main/scala/actor/ActorRegistry.scala
+++ b/akka-actors/src/main/scala/actor/ActorRegistry.scala
@@ -43,4 +43,9 @@ object ActorRegistry {
case None => actorsById + (id -> (actor :: Nil))
}
}
+
+ def unregister(actor: Actor) = synchronized {
+ actorsByClassName - actor.getClass.getName
+ actorsById - actor.getClass.getName
+ }
}
diff --git a/akka-actors/src/main/scala/actor/Scheduler.scala b/akka-actors/src/main/scala/actor/Scheduler.scala
index 9b9ee8bc7e..6266c17942 100644
--- a/akka-actors/src/main/scala/actor/Scheduler.scala
+++ b/akka-actors/src/main/scala/actor/Scheduler.scala
@@ -28,7 +28,7 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio
* which is licensed under the Apache 2 License.
*/
class ScheduleActor(val receiver: Actor, val future: ScheduledFuture[AnyRef]) extends Actor with Logging {
- lifeCycle = LifeCycle(Permanent)
+ lifeCycle = Some(LifeCycle(Permanent))
def receive = {
case UnSchedule =>
diff --git a/akka-actors/src/main/scala/actor/Supervisor.scala b/akka-actors/src/main/scala/actor/Supervisor.scala
index ec65fbfec7..e2c5e92ac2 100644
--- a/akka-actors/src/main/scala/actor/Supervisor.scala
+++ b/akka-actors/src/main/scala/actor/Supervisor.scala
@@ -23,7 +23,7 @@ case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends
*
* val factory = SupervisorFactory(
* SupervisorConfig(
- * RestartStrategy(OneForOne, 3, 10),
+ * RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
* Supervise(
* myFirstActor,
* LifeCycle(Permanent)) ::
@@ -43,6 +43,7 @@ case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends
* @author Jonas Bonér
*/
class SupervisorFactory(val config: SupervisorConfig) extends Logging {
+ type ExceptionList = List[Class[_ <: Throwable]]
def newInstance: Supervisor = newInstanceFor(config)
@@ -55,10 +56,10 @@ class SupervisorFactory(val config: SupervisorConfig) extends Logging {
}
protected def create(strategy: RestartStrategy): Supervisor = strategy match {
- case RestartStrategy(scheme, maxNrOfRetries, timeRange) =>
+ case RestartStrategy(scheme, maxNrOfRetries, timeRange, trapExceptions: ExceptionList) =>
scheme match {
- case AllForOne => new Supervisor(AllForOneStrategy(maxNrOfRetries, timeRange))
- case OneForOne => new Supervisor(OneForOneStrategy(maxNrOfRetries, timeRange))
+ case AllForOne => new Supervisor(AllForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
+ case OneForOne => new Supervisor(OneForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
}
}
}
@@ -79,23 +80,23 @@ object SupervisorFactory {
*
* @author Jonas Bonér
*/
-sealed class Supervisor private[akka] (handler: FaultHandlingStrategy)
- extends Actor with Logging with Configurator {
- trapExit = List(classOf[Throwable])
+sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]])
+ extends Actor with Logging with Configurator {
+
+ trapExit = trapExceptions
faultHandler = Some(handler)
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
- val actors = new ConcurrentHashMap[String, Actor]
+ private val actors = new ConcurrentHashMap[String, Actor]
+ // Cheating, should really go through the dispatcher rather than direct access to a CHM
def getInstance[T](clazz: Class[T]) = actors.get(clazz.getName).asInstanceOf[T]
-
def getComponentInterfaces: List[Class[_]] = actors.values.toArray.toList.map(_.getClass)
-
def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName)
- override def start = synchronized {
+ override def start: Actor = synchronized {
ConfiguratorRepository.registerConfigurator(this)
- _linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
+ getLinkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
actor.start
log.info("Starting actor: %s", actor)
}
@@ -104,7 +105,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy)
override def stop = synchronized {
super[Actor].stop
- _linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
+ getLinkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
actor.stop
log.info("Shutting actor down: %s", actor)
}
@@ -112,7 +113,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy)
}
def receive = {
- case unknown => throw new IllegalArgumentException("Supervisor does not respond to any messages. Unknown message [" + unknown + "]")
+ case unknown => throw new IllegalArgumentException("Supervisor " + toString + " does not respond to any messages. Unknown message [" + unknown + "]")
}
def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match {
@@ -121,12 +122,14 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy)
server match {
case Supervise(actor, lifeCycle) =>
actors.put(actor.getClass.getName, actor)
- actor.lifeCycle = lifeCycle
+ actor.lifeCycle = Some(lifeCycle)
startLink(actor)
- case SupervisorConfig(_, _) => // recursive configuration
- factory.newInstanceFor(server.asInstanceOf[SupervisorConfig]).start
- // FIXME what to do with recursively supervisors?
+ case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
+ val supervisor = factory.newInstanceFor(supervisorConfig).start
+ supervisor.lifeCycle = Some(LifeCycle(Permanent))
+ actors.put(supervisor.getClass.getName, supervisor)
+ link(supervisor)
})
}
}
diff --git a/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
index 04f90c331d..93a1e9c4e2 100644
--- a/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
+++ b/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
@@ -19,6 +19,8 @@ import java.net.InetSocketAddress
import java.lang.reflect.Method
/**
+ * This is an class for internal usage. Instead use the se.scalablesolutions.akka.config.ActiveObjectConfigurator class for creating ActiveObjects.
+ *
* @author Jonas Bonér
*/
private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging { // with CamelConfigurator {
@@ -44,9 +46,12 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
*/
override def getInstance[T](clazz: Class[T]): T = synchronized {
log.debug("Retrieving active object [%s]", clazz.getName)
- if (injector == null) throw new IllegalStateException("inject() and/or supervise() must be called before invoking getInstance(clazz)")
+ if (injector == null) throw new IllegalStateException(
+ "inject() and/or supervise() must be called before invoking getInstance(clazz)")
val (proxy, targetInstance, component) =
- activeObjectRegistry.getOrElse(clazz, throw new IllegalStateException("Class [" + clazz.getName + "] has not been put under supervision (by passing in the config to the 'configure' and then invoking 'supervise') method"))
+ activeObjectRegistry.getOrElse(clazz, throw new IllegalStateException(
+ "Class [" + clazz.getName + "] has not been put under supervision " +
+ "(by passing in the config to the 'configure' and then invoking 'supervise') method"))
injector.injectMembers(targetInstance)
proxy.asInstanceOf[T]
}
@@ -96,10 +101,12 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target
- val actor = new Dispatcher(component.lifeCycle.callbacks)
+ val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
val remoteAddress =
- if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
+ if (component.remoteAddress.isDefined)
+ Some(new InetSocketAddress(
+ component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = ActiveObject.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
supervised ::= Supervise(actor, component.lifeCycle)
@@ -111,12 +118,14 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val targetClass = component.intf.get
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
- val actor = new Dispatcher(component.lifeCycle.callbacks)
+ val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
val remoteAddress =
- if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
+ if (component.remoteAddress.isDefined)
+ Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
- val proxy = ActiveObject.newInstance(targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
+ val proxy = ActiveObject.newInstance(
+ targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
supervised ::= Supervise(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy)
diff --git a/akka-actors/src/main/scala/config/Config.scala b/akka-actors/src/main/scala/config/Config.scala
index 39899b73d5..136725cadd 100644
--- a/akka-actors/src/main/scala/config/Config.scala
+++ b/akka-actors/src/main/scala/config/Config.scala
@@ -22,7 +22,11 @@ object ScalaConfig {
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server
case class Supervise(actor: Actor, lifeCycle: LifeCycle) extends Server
- case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int) extends ConfigElement
+ case class RestartStrategy(
+ scheme: FailOverScheme,
+ maxNrOfRetries: Int,
+ withinTimeRange: Int,
+ trapExceptions: List[Class[_ <: Throwable]]) extends ConfigElement
case object AllForOne extends FailOverScheme
case object OneForOne extends FailOverScheme
@@ -44,6 +48,7 @@ object ScalaConfig {
val target: Class[_],
val lifeCycle: LifeCycle,
val timeout: Int,
+ val transactionRequired: Boolean,
_dispatcher: MessageDispatcher, // optional
_remoteAddress: RemoteAddress // optional
) extends Server {
@@ -53,28 +58,52 @@ object ScalaConfig {
}
object Component {
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int) =
- new Component(intf, target, lifeCycle, timeout, null, null)
+ new Component(intf, target, lifeCycle, timeout, false, null, null)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int) =
- new Component(null, target, lifeCycle, timeout, null, null)
+ new Component(null, target, lifeCycle, timeout, false, null, null)
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher) =
- new Component(intf, target, lifeCycle, timeout, dispatcher, null)
+ new Component(intf, target, lifeCycle, timeout, false, dispatcher, null)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher) =
- new Component(null, target, lifeCycle, timeout, dispatcher, null)
+ new Component(null, target, lifeCycle, timeout, false, dispatcher, null)
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, remoteAddress: RemoteAddress) =
- new Component(intf, target, lifeCycle, timeout, null, remoteAddress)
+ new Component(intf, target, lifeCycle, timeout, false, null, remoteAddress)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, remoteAddress: RemoteAddress) =
- new Component(null, target, lifeCycle, timeout, null, remoteAddress)
+ new Component(null, target, lifeCycle, timeout, false, null, remoteAddress)
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
- new Component(intf, target, lifeCycle, timeout, dispatcher, remoteAddress)
+ new Component(intf, target, lifeCycle, timeout, false, dispatcher, remoteAddress)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
- new Component(null, target, lifeCycle, timeout, dispatcher, remoteAddress)
+ new Component(null, target, lifeCycle, timeout, false, dispatcher, remoteAddress)
+
+ def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean) =
+ new Component(intf, target, lifeCycle, timeout, transactionRequired, null, null)
+
+ def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean) =
+ new Component(null, target, lifeCycle, timeout, transactionRequired, null, null)
+
+ def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
+ new Component(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, null)
+
+ def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
+ new Component(null, target, lifeCycle, timeout, transactionRequired, dispatcher, null)
+
+ def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
+ new Component(intf, target, lifeCycle, timeout, transactionRequired, null, remoteAddress)
+
+ def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
+ new Component(null, target, lifeCycle, timeout, transactionRequired, null, remoteAddress)
+
+ def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
+ new Component(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
+
+ def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
+ new Component(null, target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
}
}
@@ -89,9 +118,10 @@ object JavaConfig {
class RestartStrategy(
@BeanProperty val scheme: FailOverScheme,
@BeanProperty val maxNrOfRetries: Int,
- @BeanProperty val withinTimeRange: Int) extends ConfigElement {
+ @BeanProperty val withinTimeRange: Int,
+ @BeanProperty val trapExceptions: Array[Class[_ <: Throwable]]) extends ConfigElement {
def transform = se.scalablesolutions.akka.config.ScalaConfig.RestartStrategy(
- scheme.transform, maxNrOfRetries, withinTimeRange)
+ scheme.transform, maxNrOfRetries, withinTimeRange, trapExceptions.toList)
}
class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val callbacks: RestartCallbacks) extends ConfigElement {
@@ -133,33 +163,56 @@ object JavaConfig {
@BeanProperty val target: Class[_],
@BeanProperty val lifeCycle: LifeCycle,
@BeanProperty val timeout: Int,
+ @BeanProperty val transactionRequired: Boolean, // optional
@BeanProperty val dispatcher: MessageDispatcher, // optional
@BeanProperty val remoteAddress: RemoteAddress // optional
) extends Server {
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int) =
- this(intf, target, lifeCycle, timeout, null, null)
+ this(intf, target, lifeCycle, timeout, false, null, null)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int) =
- this(null, target, lifeCycle, timeout, null, null)
+ this(null, target, lifeCycle, timeout, false, null, null)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, remoteAddress: RemoteAddress) =
- this(intf, target, lifeCycle, timeout, null, remoteAddress)
+ this(intf, target, lifeCycle, timeout, false, null, remoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, remoteAddress: RemoteAddress) =
- this(null, target, lifeCycle, timeout, null, remoteAddress)
+ this(null, target, lifeCycle, timeout, false, null, remoteAddress)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher) =
- this(intf, target, lifeCycle, timeout, dispatcher, null)
+ this(intf, target, lifeCycle, timeout, false, dispatcher, null)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher) =
- this(null, target, lifeCycle, timeout, dispatcher, null)
+ this(null, target, lifeCycle, timeout, false, dispatcher, null)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
- this(null, target, lifeCycle, timeout, dispatcher, remoteAddress)
+ this(null, target, lifeCycle, timeout, false, dispatcher, remoteAddress)
+
+ def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean) =
+ this(intf, target, lifeCycle, timeout, transactionRequired, null, null)
+
+ def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean) =
+ this(null, target, lifeCycle, timeout, transactionRequired, null, null)
+
+ def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
+ this(intf, target, lifeCycle, timeout, transactionRequired, null, remoteAddress)
+
+ def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
+ this(null, target, lifeCycle, timeout, transactionRequired, null, remoteAddress)
+
+ def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
+ this(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, null)
+
+ def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
+ this(null, target, lifeCycle, timeout, transactionRequired, dispatcher, null)
+
+ def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
+ this(null, target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
def transform =
- se.scalablesolutions.akka.config.ScalaConfig.Component(intf, target, lifeCycle.transform, timeout, dispatcher,
+ se.scalablesolutions.akka.config.ScalaConfig.Component(
+ intf, target, lifeCycle.transform, timeout, transactionRequired, dispatcher,
if (remoteAddress != null) se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null)
def newSupervised(actor: Actor) =
diff --git a/akka-actors/src/main/scala/config/Configurator.scala b/akka-actors/src/main/scala/config/Configurator.scala
index c87138169d..3da41f2e94 100644
--- a/akka-actors/src/main/scala/config/Configurator.scala
+++ b/akka-actors/src/main/scala/config/Configurator.scala
@@ -6,7 +6,10 @@ package se.scalablesolutions.akka.config
import ScalaConfig.{RestartStrategy, Component}
-trait Configurator {
+/**
+ * Manages the active abject or actor that has been put under supervision for the class specified.
+ */
+private[akka] trait Configurator {
/**
* Returns the active abject or actor that has been put under supervision for the class specified.
*
diff --git a/akka-actors/src/main/scala/stm/Transaction.scala b/akka-actors/src/main/scala/stm/Transaction.scala
index d34661c37a..09222366bb 100644
--- a/akka-actors/src/main/scala/stm/Transaction.scala
+++ b/akka-actors/src/main/scala/stm/Transaction.scala
@@ -70,7 +70,8 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
* // For example, if you have a List with TransactionalRef
* val refs: List[TransactionalRef] = ...
*
- * // You can use them together with Transaction in a for comprehension since TransactionalRef is also monadic
+ * // You can use them together with Transaction in a for comprehension since
+ * // TransactionalRef is also monadic
* for {
* tx <- Transaction
* ref <- refs
@@ -107,7 +108,8 @@ object Transaction extends TransactionManagement {
def foreach(f: Transaction => Unit): Unit = atomic { f(getTransactionInScope) }
/**
- * Creates a "pure" STM atomic transaction and by-passes all transactions hooks such as persistence etc.
+ * Creates a "pure" STM atomic transaction and by-passes all transactions hooks
+ * such as persistence etc.
* Only for internal usage.
*/
private[akka] def pureAtomic[T](body: => T): T = new AtomicTemplate[T](
@@ -246,13 +248,18 @@ object Transaction extends TransactionManagement {
private[akka] def register(uuid: String, storage: Committable) = persistentStateMap.put(uuid, storage)
private def ensureIsActive = if (status != TransactionStatus.Active)
- throw new IllegalStateException("Expected ACTIVE transaction - current status [" + status + "]: " + toString)
+ throw new IllegalStateException(
+ "Expected ACTIVE transaction - current status [" + status + "]: " + toString)
- private def ensureIsActiveOrAborted = if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
- throw new IllegalStateException("Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
+ private def ensureIsActiveOrAborted =
+ if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
+ throw new IllegalStateException(
+ "Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
- private def ensureIsActiveOrNew = if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
- throw new IllegalStateException("Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
+ private def ensureIsActiveOrNew =
+ if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
+ throw new IllegalStateException(
+ "Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
// For reinitialize transaction after sending it over the wire
private[akka] def reinit = synchronized {
diff --git a/akka-actors/src/main/scala/stm/TransactionManagement.scala b/akka-actors/src/main/scala/stm/TransactionManagement.scala
index 6a3ee9abf8..8e63b9c0d1 100644
--- a/akka-actors/src/main/scala/stm/TransactionManagement.scala
+++ b/akka-actors/src/main/scala/stm/TransactionManagement.scala
@@ -8,24 +8,20 @@ import java.util.concurrent.atomic.AtomicBoolean
import se.scalablesolutions.akka.util.Logging
-import scala.collection.mutable.HashSet
-
import org.multiverse.api.ThreadLocalTransaction._
class StmException(msg: String) extends RuntimeException(msg)
-class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
+class TransactionAwareWrapperException(
+ val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
}
object TransactionManagement extends TransactionManagement {
import se.scalablesolutions.akka.Config._
- // FIXME reenable 'akka.stm.restart-on-collision' when new STM is in place
- val RESTART_TRANSACTION_ON_COLLISION = false //akka.Kernel.config.getBool("akka.stm.restart-on-collision", true)
- val TIME_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-for-completion", 1000)
- val NR_OF_TIMES_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-nr-of-times", 3)
- val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 100)
- val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
+
+ val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 100)
+ val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
def disableTransactions = TRANSACTION_ENABLED.set(false)
@@ -37,7 +33,6 @@ object TransactionManagement extends TransactionManagement {
trait TransactionManagement extends Logging {
import TransactionManagement.currentTransaction
- private[akka] val activeTransactions = new HashSet[Transaction]
private[akka] def createNewTransaction = currentTransaction.set(Some(new Transaction))
@@ -60,7 +55,5 @@ trait TransactionManagement extends Logging {
private[akka] def incrementTransaction = if (isTransactionInScope) getTransactionInScope.increment
private[akka] def decrementTransaction = if (isTransactionInScope) getTransactionInScope.decrement
-
- private[akka] def removeTransactionIfTopLevel(tx: Transaction) = if (tx.isTopLevel) { activeTransactions -= tx }
}
diff --git a/akka-actors/src/main/scala/stm/TransactionalState.scala b/akka-actors/src/main/scala/stm/TransactionalState.scala
index bd89a41224..16fc4f31f8 100644
--- a/akka-actors/src/main/scala/stm/TransactionalState.scala
+++ b/akka-actors/src/main/scala/stm/TransactionalState.scala
@@ -44,8 +44,7 @@ object TransactionalState {
*/
@serializable
trait Transactional {
- // FIXME: won't work across the remote machines, use [http://johannburkard.de/software/uuid/]
- var uuid = Uuid.newUuid.toString
+ val uuid: String
}
/**
@@ -75,6 +74,7 @@ object TransactionalRef {
*/
class TransactionalRef[T] extends Transactional {
import org.multiverse.api.ThreadLocalTransaction._
+ val uuid = Uuid.newUuid.toString
private[this] val ref: Ref[T] = atomic { new Ref }
@@ -126,6 +126,8 @@ object TransactionalMap {
*/
class TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
protected[this] val ref = TransactionalRef[HashTrie[K, V]]
+ val uuid = Uuid.newUuid.toString
+
ref.swap(new HashTrie[K, V])
def -=(key: K) = remove(key)
@@ -176,7 +178,10 @@ object TransactionalVector {
* @author Jonas Bonér
*/
class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] {
+ val uuid = Uuid.newUuid.toString
+
private[this] val ref = TransactionalRef[Vector[T]]
+
ref.swap(EmptyVector)
def clear = ref.swap(EmptyVector)
diff --git a/akka-actors/src/test/scala/MemoryTest.scala b/akka-actors/src/test/scala/MemoryTest.scala
new file mode 100644
index 0000000000..083b964bc7
--- /dev/null
+++ b/akka-actors/src/test/scala/MemoryTest.scala
@@ -0,0 +1,32 @@
+package se.scalablesolutions.akka.actor
+
+import junit.framework.TestCase
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+import scala.collection.mutable.HashSet
+
+class MemoryFootprintTest extends JUnitSuite {
+ class Mem extends Actor {
+ def receive = {
+ case _ => {}
+ }
+ }
+
+ @Test
+ def shouldCreateManyActors = {
+ /* println("============== MEMORY TEST ==============")
+ val actors = new HashSet[Actor]
+ println("Total memory: " + Runtime.getRuntime.totalMemory)
+ (1 until 1000000).foreach {i =>
+ val mem = new Mem
+ actors += mem
+ if ((i % 100000) == 0) {
+ println("Nr actors: " + i)
+ println("Total memory: " + (Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory))
+ }
+ }
+ */
+ assert(true)
+ }
+}
diff --git a/akka-actors/src/test/scala/RemoteSupervisorTest.scala b/akka-actors/src/test/scala/RemoteSupervisorTest.scala
index 0fef2a2e6e..719c88359f 100644
--- a/akka-actors/src/test/scala/RemoteSupervisorTest.scala
+++ b/akka-actors/src/test/scala/RemoteSupervisorTest.scala
@@ -471,7 +471,7 @@ class RemoteSupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
- RestartStrategy(AllForOne, 3, 100),
+ RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@@ -486,7 +486,7 @@ class RemoteSupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
- RestartStrategy(OneForOne, 3, 100),
+ RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@@ -504,7 +504,7 @@ class RemoteSupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
- RestartStrategy(AllForOne, 3, 100),
+ RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@@ -530,7 +530,7 @@ class RemoteSupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
- RestartStrategy(OneForOne, 3, 100),
+ RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@@ -556,13 +556,13 @@ class RemoteSupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
- RestartStrategy(AllForOne, 3, 100),
+ RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
::
SupervisorConfig(
- RestartStrategy(AllForOne, 3, 100),
+ RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong2,
LifeCycle(Permanent))
diff --git a/akka-actors/src/test/scala/SupervisorTest.scala b/akka-actors/src/test/scala/SupervisorTest.scala
index b143ef21e7..699b260301 100644
--- a/akka-actors/src/test/scala/SupervisorTest.scala
+++ b/akka-actors/src/test/scala/SupervisorTest.scala
@@ -457,7 +457,7 @@ class SupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
- RestartStrategy(AllForOne, 3, 100),
+ RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@@ -470,7 +470,7 @@ class SupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
- RestartStrategy(OneForOne, 3, 100),
+ RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@@ -485,7 +485,7 @@ class SupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
- RestartStrategy(AllForOne, 3, 100),
+ RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@@ -508,7 +508,7 @@ class SupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
- RestartStrategy(OneForOne, 3, 100),
+ RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@@ -531,13 +531,13 @@ class SupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
- RestartStrategy(AllForOne, 3, 100),
+ RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
::
SupervisorConfig(
- RestartStrategy(AllForOne, 3, 100),
+ RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong2,
LifeCycle(Permanent))
diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala
index 158dbe46d0..998c520620 100644
--- a/akka-amqp/src/main/scala/ExampleSession.scala
+++ b/akka-amqp/src/main/scala/ExampleSession.scala
@@ -4,7 +4,7 @@
package se.scalablesolutions.akka.amqp
-import se.scalablesolutions.akka.actor.Actor
+import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.Actor.Sender.Self
import com.rabbitmq.client.ConnectionParameters
@@ -30,10 +30,8 @@ object ExampleSession {
def direct = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, None, 100, false, false, Map[String, AnyRef]())
- consumer ! MessageConsumerListener("@george_bush", "direct", new Actor() {
- def receive = {
- case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
- }
+ consumer ! MessageConsumerListener("@george_bush", "direct", actor {
+ case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})
val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, IM, None, None, 100)
producer ! Message("@jonas_boner: You sucked!!".getBytes, "direct")
@@ -41,15 +39,11 @@ object ExampleSession {
def fanout = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, None, 100, false, false, Map[String, AnyRef]())
- consumer ! MessageConsumerListener("@george_bush", "", new Actor() {
- def receive = {
- case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
- }
+ consumer ! MessageConsumerListener("@george_bush", "", actor {
+ case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})
- consumer ! MessageConsumerListener("@barack_obama", "", new Actor() {
- def receive = {
- case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
- }
+ consumer ! MessageConsumerListener("@barack_obama", "", actor {
+ case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})
val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, CHAT, None, None, 100)
producer ! Message("@jonas_boner: I'm going surfing".getBytes, "")
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
index 82347669c1..61ef403f7b 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
@@ -37,7 +37,8 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
bind(Ext.class).to(ExtImpl.class).in(Scopes.SINGLETON);
}
}).configure(
- new RestartStrategy(new AllForOne(), 3, 5000), new Component[]{
+ new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}),
+ new Component[]{
new Component(
Foo.class,
new LifeCycle(new Permanent()),
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
index 26c6747eca..825d8f39fa 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
@@ -19,7 +19,7 @@ public class InMemNestedStateTest extends TestCase {
public InMemNestedStateTest() {
conf.configure(
- new RestartStrategy(new AllForOne(), 3, 5000),
+ new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[]{
// FIXME: remove string-name, add ctor to only accept target class
new Component(InMemStateful.class, new LifeCycle(new Permanent()), 10000000),
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
index e29e8ef81f..973ac14b6b 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
@@ -23,7 +23,7 @@ public class InMemoryStateTest extends TestCase {
public InMemoryStateTest() {
Config.config();
conf.configure(
- new RestartStrategy(new AllForOne(), 3, 5000),
+ new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}),
new Component[]{
new Component(InMemStateful.class,
new LifeCycle(new Permanent()),
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
index 0dcb4ec992..4c74a8b1a2 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
@@ -20,7 +20,7 @@ public class PersistentNestedStateTest extends TestCase {
protected void setUp() {
PersistenceManager.init();
conf.configure(
- new RestartStrategy(new AllForOne(), 3, 5000),
+ new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}),
new Component[]{
// FIXME: remove string-name, add ctor to only accept target class
new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 10000000),
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
index 23f8eb1070..b8f6d7eb3e 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
@@ -17,7 +17,7 @@ public class PersistentStateTest extends TestCase {
protected void setUp() {
PersistenceManager.init();
conf.configure(
- new RestartStrategy(new AllForOne(), 3, 5000),
+ new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}),
new Component[] {
new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 10000000),
new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000)
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
index 9568d6a6f1..06573759f1 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
@@ -17,7 +17,7 @@ public class RemotePersistentStateTest extends TestCase {
protected void setUp() {
PersistenceManager.init();
conf.configure(
- new RestartStrategy(new AllForOne(), 3, 5000),
+ new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[] {
new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 1000000, new RemoteAddress("localhost", 9999)),
new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000000, new RemoteAddress("localhost", 9999))
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
index b6d6ef06dc..4eb17ce7e6 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
@@ -32,7 +32,7 @@ public class RestTest extends TestCase {
@BeforeClass
protected void setUp() {
conf.configure(
- new RestartStrategy(new AllForOne(), 3, 5000),
+ new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[] {
new Component(
JerseyFoo.class,
diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala
index 88eef6b34d..70fc709f0f 100644
--- a/akka-kernel/src/main/scala/Kernel.scala
+++ b/akka-kernel/src/main/scala/Kernel.scala
@@ -16,6 +16,8 @@ import se.scalablesolutions.akka.nio.RemoteNode
import se.scalablesolutions.akka.util.Logging
/**
+ * The Akka Kernel.
+ *
* @author Jonas Bonér
*/
object Kernel extends Logging {
@@ -36,9 +38,18 @@ object Kernel extends Logging {
def main(args: Array[String]) = boot
- def boot = synchronized {
+ /**
+ * Boots up the Kernel.
+ */
+ def boot: Unit = boot(true)
+
+ /**
+ * Boots up the Kernel.
+ * If you pass in false as parameter then the Akka banner is not printed out.
+ */
+ def boot(withBanner: Boolean): Unit = synchronized {
if (!hasBooted) {
- printBanner
+ if (withBanner) printBanner
log.info("Starting Akka...")
runApplicationBootClasses
@@ -50,7 +61,7 @@ object Kernel extends Logging {
hasBooted = true
}
}
-
+
def startRemoteService = {
// FIXME manage remote serve thread for graceful shutdown
val remoteServerThread = new Thread(new Runnable() {
@@ -112,125 +123,16 @@ object Kernel extends Logging {
private def printBanner = {
log.info(
-"""==============================
- __ __
- _____ | | _| | _______
- \__ \ | |/ / |/ /\__ \
- / __ \| <| < / __ \_
- (____ /__|_ \__|_ \(____ /
- \/ \/ \/ \/
+"""
+==============================
+ __ __
+ _____ | | _| | _______
+ \__ \ | |/ / |/ /\__ \
+ / __ \| <| < / __ \_
+ (____ /__|_ \__|_ \(____ /
+ \/ \/ \/ \/
""")
log.info(" Running version " + VERSION)
log.info("==============================")
}
-
- private def cassandraBenchmark = {
- import se.scalablesolutions.akka.state.CassandraStorage
- val NR_ENTRIES = 100000
-
- println("=================================================")
- var start = System.currentTimeMillis
- for (i <- 1 to NR_ENTRIES) CassandraStorage.insertMapStorageEntryFor("test", i.toString, "data")
- var end = System.currentTimeMillis
- println("Writes per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
-
- println("=================================================")
- start = System.currentTimeMillis
- val entries = new scala.collection.mutable.ArrayBuffer[Tuple2[String, String]]
- for (i <- 1 to NR_ENTRIES) entries += (i.toString, "data")
- CassandraStorage.insertMapStorageEntriesFor("test", entries.toList)
- end = System.currentTimeMillis
- println("Writes per second - batch: " + NR_ENTRIES / ((end - start).toDouble / 1000))
-
- println("=================================================")
- start = System.currentTimeMillis
- for (i <- 1 to NR_ENTRIES) CassandraStorage.getMapStorageEntryFor("test", i.toString)
- end = System.currentTimeMillis
- println("Reads per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
-
- System.exit(0)
- }
-}
-
-
-
-
-/*
-//import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
-//import voldemort.server.{VoldemortConfig, VoldemortServer}
-//import voldemort.versioning.Versioned
-
- private[this] var storageFactory: StoreClientFactory = _
- private[this] var storageServer: VoldemortServer = _
-*/
-
-// private[akka] def startVoldemort = {
-// val VOLDEMORT_SERVER_URL = "tcp://" + SERVER_URL
-// val VOLDEMORT_SERVER_PORT = 6666
-// val VOLDEMORT_BOOTSTRAP_URL = VOLDEMORT_SERVER_URL + ":" + VOLDEMORT_SERVER_PORT
-// // Start Voldemort server
-// val config = VoldemortConfig.loadFromVoldemortHome(Boot.HOME)
-// storageServer = new VoldemortServer(config)
-// storageServer.start
-// log.info("Replicated persistent storage server started at %s", VOLDEMORT_BOOTSTRAP_URL)
-//
-// // Create Voldemort client factory
-// val numThreads = 10
-// val maxQueuedRequests = 10
-// val maxConnectionsPerNode = 10
-// val maxTotalConnections = 100
-// storageFactory = new SocketStoreClientFactory(
-// numThreads,
-// numThreads,
-// maxQueuedRequests,
-// maxConnectionsPerNode,
-// maxTotalConnections,
-// VOLDEMORT_BOOTSTRAP_URL)
-//
-// val name = this.getClass.getName
-// val storage = getStorageFor("actors")
-//// val value = storage.get(name)
-// val value = new Versioned("state")
-// //value.setObject("state")
-// storage.put(name, value)
-// }
-//
-// private[akka] def getStorageFor(storageName: String): StoreClient[String, String] =
-// storageFactory.getStoreClient(storageName)
-
-// private[akka] def startZooKeeper = {
-//import org.apache.zookeeper.jmx.ManagedUtil
-//import org.apache.zookeeper.server.persistence.FileTxnSnapLog
-//import org.apache.zookeeper.server.ServerConfig
-//import org.apache.zookeeper.server.NIOServerCnxn
-// val ZOO_KEEPER_SERVER_URL = SERVER_URL
-// val ZOO_KEEPER_SERVER_PORT = 9898
-// try {
-// ManagedUtil.registerLog4jMBeans
-// ServerConfig.parse(args)
-// } catch {
-// case e: JMException => log.warning("Unable to register log4j JMX control: s%", e)
-// case e => log.fatal("Error in ZooKeeper config: s%", e)
-// }
-// val factory = new ZooKeeperServer.Factory() {
-// override def createConnectionFactory = new NIOServerCnxn.Factory(ServerConfig.getClientPort)
-// override def createServer = {
-// val server = new ZooKeeperServer
-// val txLog = new FileTxnSnapLog(
-// new File(ServerConfig.getDataLogDir),
-// new File(ServerConfig.getDataDir))
-// server.setTxnLogFactory(txLog)
-// server
-// }
-// }
-// try {
-// val zooKeeper = factory.createServer
-// zooKeeper.startup
-// log.info("ZooKeeper started")
-// // TODO: handle clean shutdown as below in separate thread
-// // val cnxnFactory = serverFactory.createConnectionFactory
-// // cnxnFactory.setZooKeeperServer(zooKeeper)
-// // cnxnFactory.join
-// // if (zooKeeper.isRunning) zooKeeper.shutdown
-// } catch { case e => log.fatal("Unexpected exception: s%",e) }
-// }
+}
\ No newline at end of file
diff --git a/akka-persistence/src/main/scala/PersistentState.scala b/akka-persistence/src/main/scala/PersistentState.scala
index 3a63ea16f9..f08d2cd925 100644
--- a/akka-persistence/src/main/scala/PersistentState.scala
+++ b/akka-persistence/src/main/scala/PersistentState.scala
@@ -4,9 +4,11 @@
package se.scalablesolutions.akka.state
-import util.Logging
import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction
import se.scalablesolutions.akka.collection._
+import se.scalablesolutions.akka.util.Logging
+
+import org.codehaus.aspectwerkz.proxy.Uuid
class NoTransactionInScopeException extends RuntimeException
@@ -18,34 +20,67 @@ case class TokyoCabinetStorageConfig() extends PersistentStorageConfig
case class MongoStorageConfig() extends PersistentStorageConfig
/**
- * Example Scala usage:
+ * Example Scala usage.
+ *
+ * New map with generated id.
*
* val myMap = PersistentState.newMap(CassandraStorageConfig)
*
- *
+ *
+ * New map with user-defined id.
+ *
+ * val myMap = PersistentState.newMap(CassandraStorageConfig, id)
+ *
+ *
+ * Get map by user-defined id.
+ *
+ * val myMap = PersistentState.getMap(CassandraStorageConfig, id)
+ *
+ *
* Example Java usage:
*
* TransactionalMap myMap = PersistentState.newMap(new CassandraStorageConfig());
*
+ *
+ * @author Jonas Bonér
*/
object PersistentState {
- def newMap(config: PersistentStorageConfig): PersistentMap = config match {
- case CassandraStorageConfig() => new CassandraPersistentMap
- case MongoStorageConfig() => new MongoPersistentMap
+ def newMap(config: PersistentStorageConfig): PersistentMap =
+ // FIXME: won't work across the remote machines, use [http://johannburkard.de/software/uuid/]
+ newMap(config, Uuid.newUuid.toString)
+
+ def newVector(config: PersistentStorageConfig): PersistentVector =
+ newVector(config, Uuid.newUuid.toString)
+
+ def newRef(config: PersistentStorageConfig): PersistentRef =
+ newRef(config, Uuid.newUuid.toString)
+
+ def getMap(config: PersistentStorageConfig, id: String): PersistentMap =
+ newMap(config, id)
+
+ def getVector(config: PersistentStorageConfig, id: String): PersistentVector =
+ newVector(config, id)
+
+ def getRef(config: PersistentStorageConfig, id: String): PersistentRef =
+ newRef(config, id)
+
+ def newMap(config: PersistentStorageConfig, id: String): PersistentMap = config match {
+ case CassandraStorageConfig() => new CassandraPersistentMap(id)
+ case MongoStorageConfig() => new MongoPersistentMap(id)
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
- def newVector(config: PersistentStorageConfig): PersistentVector = config match {
- case CassandraStorageConfig() => new CassandraPersistentVector
- case MongoStorageConfig() => new MongoPersistentVector
+ def newVector(config: PersistentStorageConfig, id: String): PersistentVector = config match {
+ case CassandraStorageConfig() => new CassandraPersistentVector(id)
+ case MongoStorageConfig() => new MongoPersistentVector(id)
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
- def newRef(config: PersistentStorageConfig): PersistentRef = config match {
- case CassandraStorageConfig() => new CassandraPersistentRef
- case MongoStorageConfig() => new MongoPersistentRef
+ def newRef(config: PersistentStorageConfig, id: String): PersistentRef = config match {
+ case CassandraStorageConfig() => new CassandraPersistentRef(id)
+ case MongoStorageConfig() => new MongoPersistentRef(id)
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
@@ -60,7 +95,8 @@ object PersistentState {
*
* @author Jonas Bonér
*/
-trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Transactional with Committable with Logging {
+trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef]
+ with Transactional with Committable with Logging {
protected val newAndUpdatedEntries = TransactionalState.newMap[AnyRef, AnyRef]
protected val removedEntries = TransactionalState.newVector[AnyRef]
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
@@ -71,7 +107,8 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr
def commit = {
storage.removeMapStorageFor(uuid, removedEntries.toList)
storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList)
- if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get) storage.removeMapStorageFor(uuid)
+ if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get)
+ storage.removeMapStorageFor(uuid)
newAndUpdatedEntries.clear
removedEntries.clear
}
@@ -95,9 +132,11 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr
removedEntries.add(key)
}
- def slice(start: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = slice(start, None, count)
+ def slice(start: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] =
+ slice(start, None, count)
- def slice(start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = try {
+ def slice(start: Option[AnyRef], finish: Option[AnyRef], count: Int):
+ List[Tuple2[AnyRef, AnyRef]] = try {
storage.getMapStorageRangeFor(uuid, start, finish, count)
} catch { case e: Exception => Nil }
@@ -107,7 +146,8 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr
}
override def contains(key: AnyRef): Boolean = try {
- newAndUpdatedEntries.contains(key) || storage.getMapStorageEntryFor(uuid, key).isDefined
+ newAndUpdatedEntries.contains(key) ||
+ storage.getMapStorageEntryFor(uuid, key).isDefined
} catch { case e: Exception => false }
override def size: Int = try {
@@ -148,22 +188,22 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr
}
/**
- * Implements a persistent transaction
-
- al map based on the Cassandra distributed P2P key-value storage.
+ * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
*
- * @author Debasish Ghosh
+ * @author Jonas Bonér
*/
-class CassandraPersistentMap extends PersistentMap {
+class CassandraPersistentMap(id: String) extends PersistentMap {
+ val uuid = id
val storage = CassandraStorage
}
/**
- * Implements a persistent transactional map based on the MongoDB distributed P2P key-value storage.
+ * Implements a persistent transactional map based on the MongoDB document storage.
*
* @author Debasish Ghosh
*/
-class MongoPersistentMap extends PersistentMap {
+class MongoPersistentMap(id: String) extends PersistentMap {
+ val uuid = id
val storage = MongoStorage
}
@@ -244,20 +284,24 @@ trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional with C
}
/**
- * Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage.
+ * Implements a persistent transactional vector based on the Cassandra
+ * distributed P2P key-value storage.
*
* @author Jonas Bonér
*/
-class CassandraPersistentVector extends PersistentVector {
+class CassandraPersistentVector(id: String) extends PersistentVector {
+ val uuid = id
val storage = CassandraStorage
}
/**
- * Implements a persistent transactional vector based on the MongoDB distributed P2P key-value storage.
+ * Implements a persistent transactional vector based on the MongoDB
+ * document storage.
*
* @author Debaissh Ghosh
*/
-class MongoPersistentVector extends PersistentVector {
+class MongoPersistentVector(id: String) extends PersistentVector {
+ val uuid = id
val storage = MongoStorage
}
@@ -297,10 +341,12 @@ trait PersistentRef extends Transactional with Committable {
}
}
-class CassandraPersistentRef extends PersistentRef {
+class CassandraPersistentRef(id: String) extends PersistentRef {
+ val uuid = id
val storage = CassandraStorage
}
-class MongoPersistentRef extends PersistentRef {
+class MongoPersistentRef(id: String) extends PersistentRef {
+ val uuid = id
val storage = MongoStorage
}
diff --git a/akka-samples-java/src/main/java/sample/java/Boot.java b/akka-samples-java/src/main/java/sample/java/Boot.java
index 55a3de1c1f..888344013d 100644
--- a/akka-samples-java/src/main/java/sample/java/Boot.java
+++ b/akka-samples-java/src/main/java/sample/java/Boot.java
@@ -8,7 +8,7 @@ public class Boot {
public Boot() throws Exception {
manager.configure(
- new RestartStrategy(new OneForOne(), 3, 5000),
+ new RestartStrategy(new OneForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[] {
new Component(
sample.java.SimpleService.class,
diff --git a/akka-samples-lift/src/main/scala/bootstrap/liftweb/Boot.scala b/akka-samples-lift/src/main/scala/bootstrap/liftweb/Boot.scala
index 553d90b277..b6098cfb3c 100644
--- a/akka-samples-lift/src/main/scala/bootstrap/liftweb/Boot.scala
+++ b/akka-samples-lift/src/main/scala/bootstrap/liftweb/Boot.scala
@@ -38,7 +38,7 @@ class Boot {
val factory = SupervisorFactory(
SupervisorConfig(
- RestartStrategy(OneForOne, 3, 100),
+ RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
new SimpleService,
LifeCycle(Permanent)) ::
diff --git a/akka-samples-scala/src/main/scala/SimpleService.scala b/akka-samples-scala/src/main/scala/SimpleService.scala
index 361c699da7..5fa1fc6666 100644
--- a/akka-samples-scala/src/main/scala/SimpleService.scala
+++ b/akka-samples-scala/src/main/scala/SimpleService.scala
@@ -20,7 +20,7 @@ import org.atmosphere.cpr.BroadcastFilter
class Boot {
val factory = SupervisorFactory(
SupervisorConfig(
- RestartStrategy(OneForOne, 3, 100),
+ RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
new SimpleService,
LifeCycle(Permanent)) ::
diff --git a/akka-samples-security/src/main/scala/SimpleService.scala b/akka-samples-security/src/main/scala/SimpleService.scala
index cca4246933..fc8b18367a 100644
--- a/akka-samples-security/src/main/scala/SimpleService.scala
+++ b/akka-samples-security/src/main/scala/SimpleService.scala
@@ -13,7 +13,7 @@ import se.scalablesolutions.akka.state.TransactionalState
class Boot {
val factory = SupervisorFactory(
SupervisorConfig(
- RestartStrategy(OneForOne, 3, 100),
+ RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
// Dummy implementations of all authentication actors
// see akka.conf to enable one of these for the AkkaSecurityFilterFactory
Supervise(
diff --git a/changes.xml b/changes.xml
index 658b8ab550..dd7514bbec 100644
--- a/changes.xml
+++ b/changes.xml
@@ -62,6 +62,7 @@ see http://maven.apache.org/plugins/maven-changes-plugin/usage.html for full gui
New URL: http://akkasource.org
Enhanced trapping of failures: 'trapExit = List(classOf[..], classOf[..])'
Upgraded to Netty 3.2, Protobuf 2.2, ScalaTest 1.0, Jersey 1.1.3, Atmosphere 0.4.1, Cassandra 0.4.1, Configgy 1.4
+ Lowered actor memory footprint; now an actor consumes ~625 bytes, which mean that you can create 6.5 million on 4 G RAM
Concurrent mode is now per actor basis
Remote actors are now defined by their UUID (not class name)
Fixed dispatcher bug
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 8ca35927e9..3fa382f252 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -28,6 +28,7 @@
service = on
+ max-nr-of-retries = 100
distributed = off # not implemented yet
diff --git a/embedded-repo/com/facebook/fb303/1.0/fb303-1.0.jar b/embedded-repo/com/facebook/fb303/1.0/fb303-1.0.jar
deleted file mode 100644
index 129328deff..0000000000
Binary files a/embedded-repo/com/facebook/fb303/1.0/fb303-1.0.jar and /dev/null differ
diff --git a/embedded-repo/com/facebook/fb303/1.0/fb303-1.0.pom b/embedded-repo/com/facebook/fb303/1.0/fb303-1.0.pom
deleted file mode 100644
index dc99125995..0000000000
--- a/embedded-repo/com/facebook/fb303/1.0/fb303-1.0.pom
+++ /dev/null
@@ -1,8 +0,0 @@
-
-
- 4.0.0
- com.facebook
- fb303
- 1.0
- jar
-
\ No newline at end of file
diff --git a/embedded-repo/com/rabbitmq/rabbitmq-client/0.9.1/rabbitmq-client-0.9.1.pom b/embedded-repo/com/rabbitmq/rabbitmq-client/0.9.1/rabbitmq-client-0.9.1.pom
index 905b5e83dc..c78d868476 100644
--- a/embedded-repo/com/rabbitmq/rabbitmq-client/0.9.1/rabbitmq-client-0.9.1.pom
+++ b/embedded-repo/com/rabbitmq/rabbitmq-client/0.9.1/rabbitmq-client-0.9.1.pom
@@ -1,8 +1,8 @@
4.0.0
- com.rabbitmg
- rabbitmg-client
+ com.rabbitmq
+ rabbitmq-client
0.9.1
jar
\ No newline at end of file
diff --git a/embedded-repo/com/twitter/scala-stats/1.0/scala-stats-1.0.jar b/embedded-repo/com/twitter/scala-stats/1.0/scala-stats-1.0.jar
deleted file mode 100755
index 6b1b43bf7b..0000000000
Binary files a/embedded-repo/com/twitter/scala-stats/1.0/scala-stats-1.0.jar and /dev/null differ
diff --git a/embedded-repo/com/twitter/scala-stats/1.0/scala-stats-1.0.pom b/embedded-repo/com/twitter/scala-stats/1.0/scala-stats-1.0.pom
deleted file mode 100755
index d35560379a..0000000000
--- a/embedded-repo/com/twitter/scala-stats/1.0/scala-stats-1.0.pom
+++ /dev/null
@@ -1,8 +0,0 @@
-
-
- 4.0.0
- com.twitter
- scala-stats
- 1.0
- jar
-
\ No newline at end of file