Merge commit 'origin/master' into Atmosphere0.5

This commit is contained in:
Viktor Klang 2009-12-02 20:21:29 +01:00
commit 45193485be
35 changed files with 453 additions and 331 deletions

View file

@ -37,73 +37,145 @@ object ActiveObject {
val AKKA_CAMEL_ROUTING_SCHEME = "akka"
def newInstance[T](target: Class[T], timeout: Long): T =
newInstance(target, new Dispatcher(None), None, timeout)
newInstance(target, new Dispatcher(false, None), None, timeout)
def newInstance[T](target: Class[T], timeout: Long, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(target, new Dispatcher(restartCallbacks), None, timeout)
newInstance(target, new Dispatcher(false, restartCallbacks), None, timeout)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T =
newInstance(intf, target, new Dispatcher(None), None, timeout)
newInstance(intf, target, new Dispatcher(false, None), None, timeout)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(intf, target, new Dispatcher(restartCallbacks), None, timeout)
newInstance(intf, target, new Dispatcher(false, restartCallbacks), None, timeout)
def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean): T =
newInstance(target, new Dispatcher(transactionRequired, None), None, timeout)
def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(target, new Dispatcher(transactionRequired, restartCallbacks), None, timeout)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean): T =
newInstance(intf, target, new Dispatcher(transactionRequired, None), None, timeout)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(intf, target, new Dispatcher(transactionRequired, restartCallbacks), None, timeout)
def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T =
newInstance(target, new Dispatcher(None), Some(new InetSocketAddress(hostname, port)), timeout)
newInstance(target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(target, new Dispatcher(restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
newInstance(target, new Dispatcher(false, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T =
newInstance(intf, target, new Dispatcher(None), Some(new InetSocketAddress(hostname, port)), timeout)
newInstance(intf, target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(intf, target, new Dispatcher(restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
newInstance(intf, target, new Dispatcher(false, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, hostname: String, port: Int): T =
newInstance(target, new Dispatcher(transactionRequired, None), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(target, new Dispatcher(transactionRequired, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, hostname: String, port: Int): T =
newInstance(intf, target, new Dispatcher(transactionRequired, None), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(intf, target, new Dispatcher(transactionRequired, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = {
val actor = new Dispatcher(None)
val actor = new Dispatcher(false, None)
actor.messageDispatcher = dispatcher
newInstance(target, actor, None, timeout)
}
def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(restartCallbacks)
val actor = new Dispatcher(false, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(target, actor, None, timeout)
}
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = {
val actor = new Dispatcher(None)
val actor = new Dispatcher(false, None)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, None, timeout)
}
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(restartCallbacks)
val actor = new Dispatcher(false, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, None, timeout)
}
def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher): T = {
val actor = new Dispatcher(transactionRequired, None)
actor.messageDispatcher = dispatcher
newInstance(target, actor, None, timeout)
}
def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(transactionRequired, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(target, actor, None, timeout)
}
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher): T = {
val actor = new Dispatcher(transactionRequired, None)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, None, timeout)
}
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(transactionRequired, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, None, timeout)
}
def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher(None)
val actor = new Dispatcher(false, None)
actor.messageDispatcher = dispatcher
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(restartCallbacks)
val actor = new Dispatcher(false, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher(None)
val actor = new Dispatcher(false, None)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(restartCallbacks)
val actor = new Dispatcher(false, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher(transactionRequired, None)
actor.messageDispatcher = dispatcher
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(transactionRequired, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher(transactionRequired, None)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(transactionRequired, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
@ -282,7 +354,7 @@ private[akka] sealed class ActiveObjectAspect {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] class Dispatcher(val callbacks: Option[RestartCallbacks]) extends Actor {
private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Option[RestartCallbacks]) extends Actor {
private val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]()
private val ZERO_ITEM_OBJECT_ARRAY = Array[Object[_]]()
@ -292,7 +364,7 @@ private[akka] class Dispatcher(val callbacks: Option[RestartCallbacks]) extends
private var initTxState: Option[Method] = None
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef) = {
if (targetClass.isAnnotationPresent(Annotations.transactionrequired)) makeTransactionRequired
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired)) makeTransactionRequired
id = targetClass.getName
target = Some(targetInstance)
val methods = targetInstance.getClass.getDeclaredMethods.toList

View file

@ -66,7 +66,7 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Actor {
object Actor extends Logging {
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
@ -74,7 +74,7 @@ object Actor {
implicit val Self: AnyRef = this
def receive = {
case unknown =>
log.error(
Actor.log.error(
"Actor.Sender can't process messages. Received message [%s]." +
"This error could occur if you either:" +
"\n\t- Explicitly send a message to the Actor.Sender object." +
@ -115,7 +115,7 @@ object Actor {
* </pre>
*
*/
def actor[A](body: => Unit) = {
def actor[A](body: => Unit) = {
def handler[A](body: Unit) = new {
def receive(handler: PartialFunction[Any, Unit]) = new Actor() {
start
@ -129,7 +129,7 @@ object Actor {
/**
* Use to create an anonymous event-driven actor with a body but no message loop block.
* <p/>
* This actor can <b>not</b> respond to any messages but can be used as a simple way to
* This actor can <b>not</b> respond to any messages but can be used as a simple way to
* spawn a lightweight thread to process some task.
* <p/>
* The actor is started when created.
@ -163,7 +163,7 @@ object Actor {
* </pre>
*/
def actor(lifeCycleConfig: LifeCycle)(body: PartialFunction[Any, Unit]): Actor = new Actor() {
lifeCycle = lifeCycleConfig
lifeCycle = Some(lifeCycleConfig)
start
def receive = body
}
@ -183,10 +183,10 @@ object Actor {
* }
* </pre>
*/
def actor[A](lifeCycleConfig: LifeCycle)(body: => Unit) = {
def actor[A](lifeCycleConfig: LifeCycle)(body: => Unit) = {
def handler[A](body: Unit) = new {
def receive(handler: PartialFunction[Any, Unit]) = new Actor() {
lifeCycle = lifeCycleConfig
lifeCycle = Some(lifeCycleConfig)
start
body
def receive = handler
@ -209,15 +209,15 @@ object Actor {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Actor extends Logging with TransactionManagement {
trait Actor extends TransactionManagement {
ActorRegistry.register(this)
implicit protected val self: Actor = this
// FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait
private[akka] var _uuid = Uuid.newUuid.toString
def uuid = _uuid
// ====================================
// private fields
// ====================================
@ -226,9 +226,9 @@ trait Actor extends Logging with TransactionManagement {
@volatile private var _isShutDown: Boolean = false
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
private var _config: Option[AnyRef] = None
private val _remoteFlagLock = new ReadWriteLock
private val _remoteFlagLock = new ReadWriteLock
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
private[akka] val _linkedActors = new HashSet[Actor]
private[akka] var _linkedActors: Option[HashSet[Actor]] = None
private[akka] var _mailbox: MessageQueue = _
private[akka] var _supervisor: Option[Actor] = None
@ -244,7 +244,7 @@ trait Actor extends Logging with TransactionManagement {
* <p/>
* This sender reference can be used together with the '!' method for request/reply
* message exchanges and which is in many ways better than using the '!!' method
* which will make the sender wait for a reply using a *blocking* future.
* which will make the sender wait for a reply using a *blocking* future.
*/
protected[this] var sender: Option[Actor] = None
@ -284,7 +284,7 @@ trait Actor extends Logging with TransactionManagement {
* <p/>
* You can override it so it fits the specific use-case that the actor is used for.
* See the <tt>se.scalablesolutions.akka.dispatch.Dispatchers</tt> class for the different
* dispatchers available.
* dispatchers available.
* <p/>
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
@ -301,7 +301,7 @@ trait Actor extends Logging with TransactionManagement {
* Set trapExit to the list of exception classes that the actor should be able to trap
* from the actor it is supervising. When the supervising actor throws these exceptions
* then they will trigger a restart.
* <p/>
* <p/>
* <pre>
* // trap all exceptions
* trapExit = List(classOf[Throwable])
@ -328,9 +328,9 @@ trait Actor extends Logging with TransactionManagement {
/**
* User overridable callback/setting.
*
* Defines the life-cycle for a supervised actor. Default is 'LifeCycle(Permanent)' but can be overridden.
* Defines the life-cycle for a supervised actor.
*/
@volatile var lifeCycle: LifeCycle = LifeCycle(Permanent)
@volatile var lifeCycle: Option[LifeCycle] = None
/**
* User overridable callback/setting.
@ -411,14 +411,14 @@ trait Actor extends Logging with TransactionManagement {
* Starts up the actor and its message queue.
*/
def start: Actor = synchronized {
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that have been shut down with 'exit'")
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'")
if (!_isRunning) {
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
messageDispatcher.start
_isRunning = true
//if (isTransactional) this !! TransactionalInit
}
log.info("[%s] has started", toString)
Actor.log.info("[%s] has started", toString)
this
}
@ -438,6 +438,7 @@ trait Actor extends Logging with TransactionManagement {
_isRunning = false
_isShutDown = true
shutdown
ActorRegistry.unregister(this)
}
}
@ -447,7 +448,7 @@ trait Actor extends Logging with TransactionManagement {
*
* If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
* <p/>
*
*
* This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable.
* <pre>
* actor ! message
@ -513,7 +514,7 @@ trait Actor extends Logging with TransactionManagement {
getResultOrThrowException(future)
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
/**
* Sends a message asynchronously and waits on a future for a reply message.
* <p/>
@ -529,10 +530,10 @@ trait Actor extends Logging with TransactionManagement {
def !![T](message: AnyRef): Option[T] = !![T](message, timeout)
/**
* This method is evil and have been removed. Use '!!' with a timeout instead.
* This method is evil and has been removed. Use '!!' with a timeout instead.
*/
def !?[T](message: AnyRef): T = throw new UnsupportedOperationException(
"'!?' is evil and have been removed. Use '!!' with a timeout instead")
"'!?' is evil and has been removed. Use '!!' with a timeout instead")
/**
* Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
@ -551,7 +552,8 @@ trait Actor extends Logging with TransactionManagement {
"\n\t\t1. Send a message to a remote actor" +
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future that will be bound by the argument passed to 'reply'." )
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
"\n\tthat will be bound by the argument passed to 'reply'." )
case Some(future) =>
future.completeWithResult(message)
}
@ -574,7 +576,7 @@ trait Actor extends Logging with TransactionManagement {
} else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
@ -615,11 +617,11 @@ trait Actor extends Logging with TransactionManagement {
*/
protected[this] def link(actor: Actor) = {
if (_isRunning) {
_linkedActors.add(actor)
getLinkedActors.add(actor)
if (actor._supervisor.isDefined) throw new IllegalStateException(
"Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
actor._supervisor = Some(this)
log.debug("Linking actor [%s] to actor [%s]", actor, this)
Actor.log.debug("Linking actor [%s] to actor [%s]", actor, this)
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
@ -631,10 +633,11 @@ trait Actor extends Logging with TransactionManagement {
*/
protected[this] def unlink(actor: Actor) = {
if (_isRunning) {
if (!_linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink")
_linkedActors.remove(actor)
if (!getLinkedActors.contains(actor)) throw new IllegalStateException(
"Actor [" + actor + "] is not a linked actor, can't unlink")
getLinkedActors.remove(actor)
actor._supervisor = None
log.debug("Unlinking actor [%s] from actor [%s]", actor, this)
Actor.log.debug("Unlinking actor [%s] from actor [%s]", actor, this)
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
@ -741,7 +744,7 @@ trait Actor extends Logging with TransactionManagement {
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long):
CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
.setTimeout(timeout)
@ -773,7 +776,7 @@ trait Actor extends Logging with TransactionManagement {
else dispatch(messageHandle)
} catch {
case e =>
log.error(e, "Could not invoke actor [%s]", this)
Actor.log.error(e, "Could not invoke actor [%s]", this)
throw e
}
}
@ -790,7 +793,7 @@ trait Actor extends Logging with TransactionManagement {
else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
} catch {
case e =>
log.error(e, "Could not invoke actor [%s]", this)
Actor.log.error(e, "Could not invoke actor [%s]", this)
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
@ -801,7 +804,7 @@ trait Actor extends Logging with TransactionManagement {
private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
setTransaction(messageHandle.tx)
val message = messageHandle.message //serializeMessage(messageHandle.message)
senderFuture = messageHandle.future
sender = messageHandle.sender
@ -817,7 +820,7 @@ trait Actor extends Logging with TransactionManagement {
decrementTransaction
}
}
try {
if (isTransactionRequiresNew && !isTransactionInScope) {
if (senderFuture.isEmpty) throw new StmException(
@ -830,7 +833,7 @@ trait Actor extends Logging with TransactionManagement {
} else proceed
} catch {
case e =>
log.error(e, "Could not invoke actor [%s]", this)
Actor.log.error(e, "Could not invoke actor [%s]", this)
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
clearTransaction // need to clear currentTransaction before call to supervisor
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
@ -871,15 +874,16 @@ trait Actor extends Logging with TransactionManagement {
}
private[this] def restartLinkedActors(reason: AnyRef) = {
_linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
actor.lifeCycle match {
getLinkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
if (actor.lifeCycle.isEmpty) actor.lifeCycle = Some(LifeCycle(Permanent))
actor.lifeCycle.get match {
case LifeCycle(scope, _) => {
scope match {
case Permanent =>
actor.restart(reason)
case Temporary =>
log.info("Actor [%s] configured as TEMPORARY will not be restarted.", actor.id)
_linkedActors.remove(actor) // remove the temporary actor
Actor.log.info("Actor [%s] configured as TEMPORARY will not be restarted.", actor.id)
getLinkedActors.remove(actor) // remove the temporary actor
}
}
}
@ -888,7 +892,7 @@ trait Actor extends Logging with TransactionManagement {
private[Actor] def restart(reason: AnyRef) = synchronized {
preRestart(reason, _config)
log.info("Restarting actor [%s] configured as PERMANENT.", id)
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
postRestart(reason, _config)
}
@ -899,6 +903,13 @@ trait Actor extends Logging with TransactionManagement {
} else None
}
protected def getLinkedActors: HashSet[Actor] = {
if (_linkedActors.isEmpty) {
val set = new HashSet[Actor]
_linkedActors = Some(set)
set
} else _linkedActors.get
}
private[akka] def swapDispatcher(disp: MessageDispatcher) = synchronized {
messageDispatcher = disp

View file

@ -43,4 +43,9 @@ object ActorRegistry {
case None => actorsById + (id -> (actor :: Nil))
}
}
def unregister(actor: Actor) = synchronized {
actorsByClassName - actor.getClass.getName
actorsById - actor.getClass.getName
}
}

View file

@ -28,7 +28,7 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio
* which is licensed under the Apache 2 License.
*/
class ScheduleActor(val receiver: Actor, val future: ScheduledFuture[AnyRef]) extends Actor with Logging {
lifeCycle = LifeCycle(Permanent)
lifeCycle = Some(LifeCycle(Permanent))
def receive = {
case UnSchedule =>

View file

@ -23,7 +23,7 @@ case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends
* <pre>
* val factory = SupervisorFactory(
* SupervisorConfig(
* RestartStrategy(OneForOne, 3, 10),
* RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
* Supervise(
* myFirstActor,
* LifeCycle(Permanent)) ::
@ -43,6 +43,7 @@ case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class SupervisorFactory(val config: SupervisorConfig) extends Logging {
type ExceptionList = List[Class[_ <: Throwable]]
def newInstance: Supervisor = newInstanceFor(config)
@ -55,10 +56,10 @@ class SupervisorFactory(val config: SupervisorConfig) extends Logging {
}
protected def create(strategy: RestartStrategy): Supervisor = strategy match {
case RestartStrategy(scheme, maxNrOfRetries, timeRange) =>
case RestartStrategy(scheme, maxNrOfRetries, timeRange, trapExceptions: ExceptionList) =>
scheme match {
case AllForOne => new Supervisor(AllForOneStrategy(maxNrOfRetries, timeRange))
case OneForOne => new Supervisor(OneForOneStrategy(maxNrOfRetries, timeRange))
case AllForOne => new Supervisor(AllForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
case OneForOne => new Supervisor(OneForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
}
}
}
@ -79,23 +80,23 @@ object SupervisorFactory {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class Supervisor private[akka] (handler: FaultHandlingStrategy)
extends Actor with Logging with Configurator {
trapExit = List(classOf[Throwable])
sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]])
extends Actor with Logging with Configurator {
trapExit = trapExceptions
faultHandler = Some(handler)
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
val actors = new ConcurrentHashMap[String, Actor]
private val actors = new ConcurrentHashMap[String, Actor]
// Cheating, should really go through the dispatcher rather than direct access to a CHM
def getInstance[T](clazz: Class[T]) = actors.get(clazz.getName).asInstanceOf[T]
def getComponentInterfaces: List[Class[_]] = actors.values.toArray.toList.map(_.getClass)
def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName)
override def start = synchronized {
override def start: Actor = synchronized {
ConfiguratorRepository.registerConfigurator(this)
_linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
getLinkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
actor.start
log.info("Starting actor: %s", actor)
}
@ -104,7 +105,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy)
override def stop = synchronized {
super[Actor].stop
_linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
getLinkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor =>
actor.stop
log.info("Shutting actor down: %s", actor)
}
@ -112,7 +113,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy)
}
def receive = {
case unknown => throw new IllegalArgumentException("Supervisor does not respond to any messages. Unknown message [" + unknown + "]")
case unknown => throw new IllegalArgumentException("Supervisor " + toString + " does not respond to any messages. Unknown message [" + unknown + "]")
}
def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match {
@ -121,12 +122,14 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy)
server match {
case Supervise(actor, lifeCycle) =>
actors.put(actor.getClass.getName, actor)
actor.lifeCycle = lifeCycle
actor.lifeCycle = Some(lifeCycle)
startLink(actor)
case SupervisorConfig(_, _) => // recursive configuration
factory.newInstanceFor(server.asInstanceOf[SupervisorConfig]).start
// FIXME what to do with recursively supervisors?
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val supervisor = factory.newInstanceFor(supervisorConfig).start
supervisor.lifeCycle = Some(LifeCycle(Permanent))
actors.put(supervisor.getClass.getName, supervisor)
link(supervisor)
})
}
}

View file

@ -19,6 +19,8 @@ import java.net.InetSocketAddress
import java.lang.reflect.Method
/**
* This is an class for internal usage. Instead use the <code>se.scalablesolutions.akka.config.ActiveObjectConfigurator</code> class for creating ActiveObjects.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging { // with CamelConfigurator {
@ -44,9 +46,12 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
*/
override def getInstance[T](clazz: Class[T]): T = synchronized {
log.debug("Retrieving active object [%s]", clazz.getName)
if (injector == null) throw new IllegalStateException("inject() and/or supervise() must be called before invoking getInstance(clazz)")
if (injector == null) throw new IllegalStateException(
"inject() and/or supervise() must be called before invoking getInstance(clazz)")
val (proxy, targetInstance, component) =
activeObjectRegistry.getOrElse(clazz, throw new IllegalStateException("Class [" + clazz.getName + "] has not been put under supervision (by passing in the config to the 'configure' and then invoking 'supervise') method"))
activeObjectRegistry.getOrElse(clazz, throw new IllegalStateException(
"Class [" + clazz.getName + "] has not been put under supervision " +
"(by passing in the config to the 'configure' and then invoking 'supervise') method"))
injector.injectMembers(targetInstance)
proxy.asInstanceOf[T]
}
@ -96,10 +101,12 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target
val actor = new Dispatcher(component.lifeCycle.callbacks)
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
val remoteAddress =
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(
component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = ActiveObject.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
supervised ::= Supervise(actor, component.lifeCycle)
@ -111,12 +118,14 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val targetClass = component.intf.get
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
val actor = new Dispatcher(component.lifeCycle.callbacks)
val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
val remoteAddress =
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = ActiveObject.newInstance(targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
val proxy = ActiveObject.newInstance(
targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
supervised ::= Supervise(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy)

View file

@ -22,7 +22,11 @@ object ScalaConfig {
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server
case class Supervise(actor: Actor, lifeCycle: LifeCycle) extends Server
case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int) extends ConfigElement
case class RestartStrategy(
scheme: FailOverScheme,
maxNrOfRetries: Int,
withinTimeRange: Int,
trapExceptions: List[Class[_ <: Throwable]]) extends ConfigElement
case object AllForOne extends FailOverScheme
case object OneForOne extends FailOverScheme
@ -44,6 +48,7 @@ object ScalaConfig {
val target: Class[_],
val lifeCycle: LifeCycle,
val timeout: Int,
val transactionRequired: Boolean,
_dispatcher: MessageDispatcher, // optional
_remoteAddress: RemoteAddress // optional
) extends Server {
@ -53,28 +58,52 @@ object ScalaConfig {
}
object Component {
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int) =
new Component(intf, target, lifeCycle, timeout, null, null)
new Component(intf, target, lifeCycle, timeout, false, null, null)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int) =
new Component(null, target, lifeCycle, timeout, null, null)
new Component(null, target, lifeCycle, timeout, false, null, null)
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher) =
new Component(intf, target, lifeCycle, timeout, dispatcher, null)
new Component(intf, target, lifeCycle, timeout, false, dispatcher, null)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher) =
new Component(null, target, lifeCycle, timeout, dispatcher, null)
new Component(null, target, lifeCycle, timeout, false, dispatcher, null)
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, remoteAddress: RemoteAddress) =
new Component(intf, target, lifeCycle, timeout, null, remoteAddress)
new Component(intf, target, lifeCycle, timeout, false, null, remoteAddress)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, remoteAddress: RemoteAddress) =
new Component(null, target, lifeCycle, timeout, null, remoteAddress)
new Component(null, target, lifeCycle, timeout, false, null, remoteAddress)
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
new Component(intf, target, lifeCycle, timeout, dispatcher, remoteAddress)
new Component(intf, target, lifeCycle, timeout, false, dispatcher, remoteAddress)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
new Component(null, target, lifeCycle, timeout, dispatcher, remoteAddress)
new Component(null, target, lifeCycle, timeout, false, dispatcher, remoteAddress)
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean) =
new Component(intf, target, lifeCycle, timeout, transactionRequired, null, null)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean) =
new Component(null, target, lifeCycle, timeout, transactionRequired, null, null)
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
new Component(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, null)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
new Component(null, target, lifeCycle, timeout, transactionRequired, dispatcher, null)
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
new Component(intf, target, lifeCycle, timeout, transactionRequired, null, remoteAddress)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
new Component(null, target, lifeCycle, timeout, transactionRequired, null, remoteAddress)
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
new Component(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
new Component(null, target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
}
}
@ -89,9 +118,10 @@ object JavaConfig {
class RestartStrategy(
@BeanProperty val scheme: FailOverScheme,
@BeanProperty val maxNrOfRetries: Int,
@BeanProperty val withinTimeRange: Int) extends ConfigElement {
@BeanProperty val withinTimeRange: Int,
@BeanProperty val trapExceptions: Array[Class[_ <: Throwable]]) extends ConfigElement {
def transform = se.scalablesolutions.akka.config.ScalaConfig.RestartStrategy(
scheme.transform, maxNrOfRetries, withinTimeRange)
scheme.transform, maxNrOfRetries, withinTimeRange, trapExceptions.toList)
}
class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val callbacks: RestartCallbacks) extends ConfigElement {
@ -133,33 +163,56 @@ object JavaConfig {
@BeanProperty val target: Class[_],
@BeanProperty val lifeCycle: LifeCycle,
@BeanProperty val timeout: Int,
@BeanProperty val transactionRequired: Boolean, // optional
@BeanProperty val dispatcher: MessageDispatcher, // optional
@BeanProperty val remoteAddress: RemoteAddress // optional
) extends Server {
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int) =
this(intf, target, lifeCycle, timeout, null, null)
this(intf, target, lifeCycle, timeout, false, null, null)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int) =
this(null, target, lifeCycle, timeout, null, null)
this(null, target, lifeCycle, timeout, false, null, null)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, remoteAddress: RemoteAddress) =
this(intf, target, lifeCycle, timeout, null, remoteAddress)
this(intf, target, lifeCycle, timeout, false, null, remoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, remoteAddress: RemoteAddress) =
this(null, target, lifeCycle, timeout, null, remoteAddress)
this(null, target, lifeCycle, timeout, false, null, remoteAddress)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher) =
this(intf, target, lifeCycle, timeout, dispatcher, null)
this(intf, target, lifeCycle, timeout, false, dispatcher, null)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher) =
this(null, target, lifeCycle, timeout, dispatcher, null)
this(null, target, lifeCycle, timeout, false, dispatcher, null)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
this(null, target, lifeCycle, timeout, dispatcher, remoteAddress)
this(null, target, lifeCycle, timeout, false, dispatcher, remoteAddress)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean) =
this(intf, target, lifeCycle, timeout, transactionRequired, null, null)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean) =
this(null, target, lifeCycle, timeout, transactionRequired, null, null)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
this(intf, target, lifeCycle, timeout, transactionRequired, null, remoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, remoteAddress: RemoteAddress) =
this(null, target, lifeCycle, timeout, transactionRequired, null, remoteAddress)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
this(intf, target, lifeCycle, timeout, transactionRequired, dispatcher, null)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher) =
this(null, target, lifeCycle, timeout, transactionRequired, dispatcher, null)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
this(null, target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
def transform =
se.scalablesolutions.akka.config.ScalaConfig.Component(intf, target, lifeCycle.transform, timeout, dispatcher,
se.scalablesolutions.akka.config.ScalaConfig.Component(
intf, target, lifeCycle.transform, timeout, transactionRequired, dispatcher,
if (remoteAddress != null) se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null)
def newSupervised(actor: Actor) =

View file

@ -6,7 +6,10 @@ package se.scalablesolutions.akka.config
import ScalaConfig.{RestartStrategy, Component}
trait Configurator {
/**
* Manages the active abject or actor that has been put under supervision for the class specified.
*/
private[akka] trait Configurator {
/**
* Returns the active abject or actor that has been put under supervision for the class specified.
*

View file

@ -70,7 +70,8 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
* // For example, if you have a List with TransactionalRef
* val refs: List[TransactionalRef] = ...
*
* // You can use them together with Transaction in a for comprehension since TransactionalRef is also monadic
* // You can use them together with Transaction in a for comprehension since
* // TransactionalRef is also monadic
* for {
* tx <- Transaction
* ref <- refs
@ -107,7 +108,8 @@ object Transaction extends TransactionManagement {
def foreach(f: Transaction => Unit): Unit = atomic { f(getTransactionInScope) }
/**
* Creates a "pure" STM atomic transaction and by-passes all transactions hooks such as persistence etc.
* Creates a "pure" STM atomic transaction and by-passes all transactions hooks
* such as persistence etc.
* Only for internal usage.
*/
private[akka] def pureAtomic[T](body: => T): T = new AtomicTemplate[T](
@ -246,13 +248,18 @@ object Transaction extends TransactionManagement {
private[akka] def register(uuid: String, storage: Committable) = persistentStateMap.put(uuid, storage)
private def ensureIsActive = if (status != TransactionStatus.Active)
throw new IllegalStateException("Expected ACTIVE transaction - current status [" + status + "]: " + toString)
throw new IllegalStateException(
"Expected ACTIVE transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrAborted = if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
throw new IllegalStateException("Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrAborted =
if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
throw new IllegalStateException(
"Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrNew = if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
throw new IllegalStateException("Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrNew =
if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
throw new IllegalStateException(
"Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
// For reinitialize transaction after sending it over the wire
private[akka] def reinit = synchronized {

View file

@ -8,24 +8,20 @@ import java.util.concurrent.atomic.AtomicBoolean
import se.scalablesolutions.akka.util.Logging
import scala.collection.mutable.HashSet
import org.multiverse.api.ThreadLocalTransaction._
class StmException(msg: String) extends RuntimeException(msg)
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
class TransactionAwareWrapperException(
val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
}
object TransactionManagement extends TransactionManagement {
import se.scalablesolutions.akka.Config._
// FIXME reenable 'akka.stm.restart-on-collision' when new STM is in place
val RESTART_TRANSACTION_ON_COLLISION = false //akka.Kernel.config.getBool("akka.stm.restart-on-collision", true)
val TIME_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-for-completion", 1000)
val NR_OF_TIMES_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-nr-of-times", 3)
val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 100)
val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 100)
val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
def disableTransactions = TRANSACTION_ENABLED.set(false)
@ -37,7 +33,6 @@ object TransactionManagement extends TransactionManagement {
trait TransactionManagement extends Logging {
import TransactionManagement.currentTransaction
private[akka] val activeTransactions = new HashSet[Transaction]
private[akka] def createNewTransaction = currentTransaction.set(Some(new Transaction))
@ -60,7 +55,5 @@ trait TransactionManagement extends Logging {
private[akka] def incrementTransaction = if (isTransactionInScope) getTransactionInScope.increment
private[akka] def decrementTransaction = if (isTransactionInScope) getTransactionInScope.decrement
private[akka] def removeTransactionIfTopLevel(tx: Transaction) = if (tx.isTopLevel) { activeTransactions -= tx }
}

View file

@ -44,8 +44,7 @@ object TransactionalState {
*/
@serializable
trait Transactional {
// FIXME: won't work across the remote machines, use [http://johannburkard.de/software/uuid/]
var uuid = Uuid.newUuid.toString
val uuid: String
}
/**
@ -75,6 +74,7 @@ object TransactionalRef {
*/
class TransactionalRef[T] extends Transactional {
import org.multiverse.api.ThreadLocalTransaction._
val uuid = Uuid.newUuid.toString
private[this] val ref: Ref[T] = atomic { new Ref }
@ -126,6 +126,8 @@ object TransactionalMap {
*/
class TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
protected[this] val ref = TransactionalRef[HashTrie[K, V]]
val uuid = Uuid.newUuid.toString
ref.swap(new HashTrie[K, V])
def -=(key: K) = remove(key)
@ -176,7 +178,10 @@ object TransactionalVector {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] {
val uuid = Uuid.newUuid.toString
private[this] val ref = TransactionalRef[Vector[T]]
ref.swap(EmptyVector)
def clear = ref.swap(EmptyVector)

View file

@ -0,0 +1,32 @@
package se.scalablesolutions.akka.actor
import junit.framework.TestCase
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import scala.collection.mutable.HashSet
class MemoryFootprintTest extends JUnitSuite {
class Mem extends Actor {
def receive = {
case _ => {}
}
}
@Test
def shouldCreateManyActors = {
/* println("============== MEMORY TEST ==============")
val actors = new HashSet[Actor]
println("Total memory: " + Runtime.getRuntime.totalMemory)
(1 until 1000000).foreach {i =>
val mem = new Mem
actors += mem
if ((i % 100000) == 0) {
println("Nr actors: " + i)
println("Total memory: " + (Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory))
}
}
*/
assert(true)
}
}

View file

@ -471,7 +471,7 @@ class RemoteSupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@ -486,7 +486,7 @@ class RemoteSupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@ -504,7 +504,7 @@ class RemoteSupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@ -530,7 +530,7 @@ class RemoteSupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@ -556,13 +556,13 @@ class RemoteSupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
::
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong2,
LifeCycle(Permanent))

View file

@ -457,7 +457,7 @@ class SupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@ -470,7 +470,7 @@ class SupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@ -485,7 +485,7 @@ class SupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@ -508,7 +508,7 @@ class SupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@ -531,13 +531,13 @@ class SupervisorTest extends JUnitSuite {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
::
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
Supervise(
pingpong2,
LifeCycle(Permanent))

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.Actor.Sender.Self
import com.rabbitmq.client.ConnectionParameters
@ -30,10 +30,8 @@ object ExampleSession {
def direct = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, None, 100, false, false, Map[String, AnyRef]())
consumer ! MessageConsumerListener("@george_bush", "direct", new Actor() {
def receive = {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
}
consumer ! MessageConsumerListener("@george_bush", "direct", actor {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})
val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, IM, None, None, 100)
producer ! Message("@jonas_boner: You sucked!!".getBytes, "direct")
@ -41,15 +39,11 @@ object ExampleSession {
def fanout = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, None, 100, false, false, Map[String, AnyRef]())
consumer ! MessageConsumerListener("@george_bush", "", new Actor() {
def receive = {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
}
consumer ! MessageConsumerListener("@george_bush", "", actor {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})
consumer ! MessageConsumerListener("@barack_obama", "", new Actor() {
def receive = {
case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
}
consumer ! MessageConsumerListener("@barack_obama", "", actor {
case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})
val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, CHAT, None, None, 100)
producer ! Message("@jonas_boner: I'm going surfing".getBytes, "")

View file

@ -37,7 +37,8 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
bind(Ext.class).to(ExtImpl.class).in(Scopes.SINGLETON);
}
}).configure(
new RestartStrategy(new AllForOne(), 3, 5000), new Component[]{
new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[]{
new Component(
Foo.class,
new LifeCycle(new Permanent()),

View file

@ -19,7 +19,7 @@ public class InMemNestedStateTest extends TestCase {
public InMemNestedStateTest() {
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000),
new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[]{
// FIXME: remove string-name, add ctor to only accept target class
new Component(InMemStateful.class, new LifeCycle(new Permanent()), 10000000),

View file

@ -23,7 +23,7 @@ public class InMemoryStateTest extends TestCase {
public InMemoryStateTest() {
Config.config();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000),
new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}),
new Component[]{
new Component(InMemStateful.class,
new LifeCycle(new Permanent()),

View file

@ -20,7 +20,7 @@ public class PersistentNestedStateTest extends TestCase {
protected void setUp() {
PersistenceManager.init();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000),
new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}),
new Component[]{
// FIXME: remove string-name, add ctor to only accept target class
new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 10000000),

View file

@ -17,7 +17,7 @@ public class PersistentStateTest extends TestCase {
protected void setUp() {
PersistenceManager.init();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000),
new RestartStrategy(new AllForOne(), 3, 5000, new Class[] {Exception.class}),
new Component[] {
new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 10000000),
new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000)

View file

@ -17,7 +17,7 @@ public class RemotePersistentStateTest extends TestCase {
protected void setUp() {
PersistenceManager.init();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000),
new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[] {
new Component(PersistentStateful.class, new LifeCycle(new Permanent()), 1000000, new RemoteAddress("localhost", 9999)),
new Component(PersistentFailer.class, new LifeCycle(new Permanent()), 1000000, new RemoteAddress("localhost", 9999))

View file

@ -32,7 +32,7 @@ public class RestTest extends TestCase {
@BeforeClass
protected void setUp() {
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000),
new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[] {
new Component(
JerseyFoo.class,

View file

@ -16,6 +16,8 @@ import se.scalablesolutions.akka.nio.RemoteNode
import se.scalablesolutions.akka.util.Logging
/**
* The Akka Kernel.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Kernel extends Logging {
@ -36,9 +38,18 @@ object Kernel extends Logging {
def main(args: Array[String]) = boot
def boot = synchronized {
/**
* Boots up the Kernel.
*/
def boot: Unit = boot(true)
/**
* Boots up the Kernel.
* If you pass in false as parameter then the Akka banner is not printed out.
*/
def boot(withBanner: Boolean): Unit = synchronized {
if (!hasBooted) {
printBanner
if (withBanner) printBanner
log.info("Starting Akka...")
runApplicationBootClasses
@ -50,7 +61,7 @@ object Kernel extends Logging {
hasBooted = true
}
}
def startRemoteService = {
// FIXME manage remote serve thread for graceful shutdown
val remoteServerThread = new Thread(new Runnable() {
@ -112,125 +123,16 @@ object Kernel extends Logging {
private def printBanner = {
log.info(
"""==============================
__ __
_____ | | _| | _______
\__ \ | |/ / |/ /\__ \
/ __ \| <| < / __ \_
(____ /__|_ \__|_ \(____ /
\/ \/ \/ \/
"""
==============================
__ __
_____ | | _| | _______
\__ \ | |/ / |/ /\__ \
/ __ \| <| < / __ \_
(____ /__|_ \__|_ \(____ /
\/ \/ \/ \/
""")
log.info(" Running version " + VERSION)
log.info("==============================")
}
private def cassandraBenchmark = {
import se.scalablesolutions.akka.state.CassandraStorage
val NR_ENTRIES = 100000
println("=================================================")
var start = System.currentTimeMillis
for (i <- 1 to NR_ENTRIES) CassandraStorage.insertMapStorageEntryFor("test", i.toString, "data")
var end = System.currentTimeMillis
println("Writes per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
println("=================================================")
start = System.currentTimeMillis
val entries = new scala.collection.mutable.ArrayBuffer[Tuple2[String, String]]
for (i <- 1 to NR_ENTRIES) entries += (i.toString, "data")
CassandraStorage.insertMapStorageEntriesFor("test", entries.toList)
end = System.currentTimeMillis
println("Writes per second - batch: " + NR_ENTRIES / ((end - start).toDouble / 1000))
println("=================================================")
start = System.currentTimeMillis
for (i <- 1 to NR_ENTRIES) CassandraStorage.getMapStorageEntryFor("test", i.toString)
end = System.currentTimeMillis
println("Reads per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
System.exit(0)
}
}
/*
//import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
//import voldemort.server.{VoldemortConfig, VoldemortServer}
//import voldemort.versioning.Versioned
private[this] var storageFactory: StoreClientFactory = _
private[this] var storageServer: VoldemortServer = _
*/
// private[akka] def startVoldemort = {
// val VOLDEMORT_SERVER_URL = "tcp://" + SERVER_URL
// val VOLDEMORT_SERVER_PORT = 6666
// val VOLDEMORT_BOOTSTRAP_URL = VOLDEMORT_SERVER_URL + ":" + VOLDEMORT_SERVER_PORT
// // Start Voldemort server
// val config = VoldemortConfig.loadFromVoldemortHome(Boot.HOME)
// storageServer = new VoldemortServer(config)
// storageServer.start
// log.info("Replicated persistent storage server started at %s", VOLDEMORT_BOOTSTRAP_URL)
//
// // Create Voldemort client factory
// val numThreads = 10
// val maxQueuedRequests = 10
// val maxConnectionsPerNode = 10
// val maxTotalConnections = 100
// storageFactory = new SocketStoreClientFactory(
// numThreads,
// numThreads,
// maxQueuedRequests,
// maxConnectionsPerNode,
// maxTotalConnections,
// VOLDEMORT_BOOTSTRAP_URL)
//
// val name = this.getClass.getName
// val storage = getStorageFor("actors")
//// val value = storage.get(name)
// val value = new Versioned("state")
// //value.setObject("state")
// storage.put(name, value)
// }
//
// private[akka] def getStorageFor(storageName: String): StoreClient[String, String] =
// storageFactory.getStoreClient(storageName)
// private[akka] def startZooKeeper = {
//import org.apache.zookeeper.jmx.ManagedUtil
//import org.apache.zookeeper.server.persistence.FileTxnSnapLog
//import org.apache.zookeeper.server.ServerConfig
//import org.apache.zookeeper.server.NIOServerCnxn
// val ZOO_KEEPER_SERVER_URL = SERVER_URL
// val ZOO_KEEPER_SERVER_PORT = 9898
// try {
// ManagedUtil.registerLog4jMBeans
// ServerConfig.parse(args)
// } catch {
// case e: JMException => log.warning("Unable to register log4j JMX control: s%", e)
// case e => log.fatal("Error in ZooKeeper config: s%", e)
// }
// val factory = new ZooKeeperServer.Factory() {
// override def createConnectionFactory = new NIOServerCnxn.Factory(ServerConfig.getClientPort)
// override def createServer = {
// val server = new ZooKeeperServer
// val txLog = new FileTxnSnapLog(
// new File(ServerConfig.getDataLogDir),
// new File(ServerConfig.getDataDir))
// server.setTxnLogFactory(txLog)
// server
// }
// }
// try {
// val zooKeeper = factory.createServer
// zooKeeper.startup
// log.info("ZooKeeper started")
// // TODO: handle clean shutdown as below in separate thread
// // val cnxnFactory = serverFactory.createConnectionFactory
// // cnxnFactory.setZooKeeperServer(zooKeeper)
// // cnxnFactory.join
// // if (zooKeeper.isRunning) zooKeeper.shutdown
// } catch { case e => log.fatal("Unexpected exception: s%",e) }
// }
}

View file

@ -4,9 +4,11 @@
package se.scalablesolutions.akka.state
import util.Logging
import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction
import se.scalablesolutions.akka.collection._
import se.scalablesolutions.akka.util.Logging
import org.codehaus.aspectwerkz.proxy.Uuid
class NoTransactionInScopeException extends RuntimeException
@ -18,34 +20,67 @@ case class TokyoCabinetStorageConfig() extends PersistentStorageConfig
case class MongoStorageConfig() extends PersistentStorageConfig
/**
* Example Scala usage:
* Example Scala usage.
* <p/>
* New map with generated id.
* <pre>
* val myMap = PersistentState.newMap(CassandraStorageConfig)
* </pre>
* <p/>
*
* New map with user-defined id.
* <pre>
* val myMap = PersistentState.newMap(CassandraStorageConfig, id)
* </pre>
*
* Get map by user-defined id.
* <pre>
* val myMap = PersistentState.getMap(CassandraStorageConfig, id)
* </pre>
*
* Example Java usage:
* <pre>
* TransactionalMap myMap = PersistentState.newMap(new CassandraStorageConfig());
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object PersistentState {
def newMap(config: PersistentStorageConfig): PersistentMap = config match {
case CassandraStorageConfig() => new CassandraPersistentMap
case MongoStorageConfig() => new MongoPersistentMap
def newMap(config: PersistentStorageConfig): PersistentMap =
// FIXME: won't work across the remote machines, use [http://johannburkard.de/software/uuid/]
newMap(config, Uuid.newUuid.toString)
def newVector(config: PersistentStorageConfig): PersistentVector =
newVector(config, Uuid.newUuid.toString)
def newRef(config: PersistentStorageConfig): PersistentRef =
newRef(config, Uuid.newUuid.toString)
def getMap(config: PersistentStorageConfig, id: String): PersistentMap =
newMap(config, id)
def getVector(config: PersistentStorageConfig, id: String): PersistentVector =
newVector(config, id)
def getRef(config: PersistentStorageConfig, id: String): PersistentRef =
newRef(config, id)
def newMap(config: PersistentStorageConfig, id: String): PersistentMap = config match {
case CassandraStorageConfig() => new CassandraPersistentMap(id)
case MongoStorageConfig() => new MongoPersistentMap(id)
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
def newVector(config: PersistentStorageConfig): PersistentVector = config match {
case CassandraStorageConfig() => new CassandraPersistentVector
case MongoStorageConfig() => new MongoPersistentVector
def newVector(config: PersistentStorageConfig, id: String): PersistentVector = config match {
case CassandraStorageConfig() => new CassandraPersistentVector(id)
case MongoStorageConfig() => new MongoPersistentVector(id)
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
def newRef(config: PersistentStorageConfig): PersistentRef = config match {
case CassandraStorageConfig() => new CassandraPersistentRef
case MongoStorageConfig() => new MongoPersistentRef
def newRef(config: PersistentStorageConfig, id: String): PersistentRef = config match {
case CassandraStorageConfig() => new CassandraPersistentRef(id)
case MongoStorageConfig() => new MongoPersistentRef(id)
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
@ -60,7 +95,8 @@ object PersistentState {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Transactional with Committable with Logging {
trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef]
with Transactional with Committable with Logging {
protected val newAndUpdatedEntries = TransactionalState.newMap[AnyRef, AnyRef]
protected val removedEntries = TransactionalState.newVector[AnyRef]
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
@ -71,7 +107,8 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr
def commit = {
storage.removeMapStorageFor(uuid, removedEntries.toList)
storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList)
if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get) storage.removeMapStorageFor(uuid)
if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get)
storage.removeMapStorageFor(uuid)
newAndUpdatedEntries.clear
removedEntries.clear
}
@ -95,9 +132,11 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr
removedEntries.add(key)
}
def slice(start: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = slice(start, None, count)
def slice(start: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] =
slice(start, None, count)
def slice(start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = try {
def slice(start: Option[AnyRef], finish: Option[AnyRef], count: Int):
List[Tuple2[AnyRef, AnyRef]] = try {
storage.getMapStorageRangeFor(uuid, start, finish, count)
} catch { case e: Exception => Nil }
@ -107,7 +146,8 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr
}
override def contains(key: AnyRef): Boolean = try {
newAndUpdatedEntries.contains(key) || storage.getMapStorageEntryFor(uuid, key).isDefined
newAndUpdatedEntries.contains(key) ||
storage.getMapStorageEntryFor(uuid, key).isDefined
} catch { case e: Exception => false }
override def size: Int = try {
@ -148,22 +188,22 @@ trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Tr
}
/**
* Implements a persistent transaction
al map based on the Cassandra distributed P2P key-value storage.
* Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class CassandraPersistentMap extends PersistentMap {
class CassandraPersistentMap(id: String) extends PersistentMap {
val uuid = id
val storage = CassandraStorage
}
/**
* Implements a persistent transactional map based on the MongoDB distributed P2P key-value storage.
* Implements a persistent transactional map based on the MongoDB document storage.
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
class MongoPersistentMap extends PersistentMap {
class MongoPersistentMap(id: String) extends PersistentMap {
val uuid = id
val storage = MongoStorage
}
@ -244,20 +284,24 @@ trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional with C
}
/**
* Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage.
* Implements a persistent transactional vector based on the Cassandra
* distributed P2P key-value storage.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class CassandraPersistentVector extends PersistentVector {
class CassandraPersistentVector(id: String) extends PersistentVector {
val uuid = id
val storage = CassandraStorage
}
/**
* Implements a persistent transactional vector based on the MongoDB distributed P2P key-value storage.
* Implements a persistent transactional vector based on the MongoDB
* document storage.
*
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
*/
class MongoPersistentVector extends PersistentVector {
class MongoPersistentVector(id: String) extends PersistentVector {
val uuid = id
val storage = MongoStorage
}
@ -297,10 +341,12 @@ trait PersistentRef extends Transactional with Committable {
}
}
class CassandraPersistentRef extends PersistentRef {
class CassandraPersistentRef(id: String) extends PersistentRef {
val uuid = id
val storage = CassandraStorage
}
class MongoPersistentRef extends PersistentRef {
class MongoPersistentRef(id: String) extends PersistentRef {
val uuid = id
val storage = MongoStorage
}

View file

@ -8,7 +8,7 @@ public class Boot {
public Boot() throws Exception {
manager.configure(
new RestartStrategy(new OneForOne(), 3, 5000),
new RestartStrategy(new OneForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[] {
new Component(
sample.java.SimpleService.class,

View file

@ -38,7 +38,7 @@ class Boot {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
new SimpleService,
LifeCycle(Permanent)) ::

View file

@ -20,7 +20,7 @@ import org.atmosphere.cpr.BroadcastFilter
class Boot {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(
new SimpleService,
LifeCycle(Permanent)) ::

View file

@ -13,7 +13,7 @@ import se.scalablesolutions.akka.state.TransactionalState
class Boot {
val factory = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
// Dummy implementations of all authentication actors
// see akka.conf to enable one of these for the AkkaSecurityFilterFactory
Supervise(

View file

@ -62,6 +62,7 @@ see http://maven.apache.org/plugins/maven-changes-plugin/usage.html for full gui
<action dev="Jonas Bon&#233;r" type="add">New URL: http://akkasource.org</action>
<action dev="Jonas Bon&#233;r" type="add">Enhanced trapping of failures: 'trapExit = List(classOf[..], classOf[..])'</action>
<action dev="Jonas Bon&#233;r" type="add">Upgraded to Netty 3.2, Protobuf 2.2, ScalaTest 1.0, Jersey 1.1.3, Atmosphere 0.4.1, Cassandra 0.4.1, Configgy 1.4</action>
<action dev="Jonas Bon&#233;r" type="fix">Lowered actor memory footprint; now an actor consumes ~625 bytes, which mean that you can create 6.5 million on 4 G RAM</action>
<action dev="Jonas Bon&#233;r" type="fix">Concurrent mode is now per actor basis</action>
<action dev="Jonas Bon&#233;r" type="fix">Remote actors are now defined by their UUID (not class name)</action>
<action dev="Jonas Bon&#233;r" type="fix">Fixed dispatcher bug</action>

View file

@ -28,6 +28,7 @@
<stm>
service = on
max-nr-of-retries = 100
distributed = off # not implemented yet
</stm>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.facebook</groupId>
<artifactId>fb303</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
</project>

View file

@ -1,8 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.rabbitmg</groupId>
<artifactId>rabbitmg-client</artifactId>
<groupId>com.rabbitmq</groupId>
<artifactId>rabbitmq-client</artifactId>
<version>0.9.1</version>
<packaging>jar</packaging>
</project>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.twitter</groupId>
<artifactId>scala-stats</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
</project>