mid merge

This commit is contained in:
Jonas Bonér 2010-04-27 21:31:20 +02:00
commit ebaead3cad
44 changed files with 1743 additions and 249 deletions

3
.gitignore vendored
View file

@ -1,5 +1,6 @@
*~
*#
project/plugins/project/
project/boot/*
*/project/build/target
*/project/boot
@ -32,4 +33,4 @@ tm.out
.classpath
.idea
.scala_dependencies
multiverse.log

View file

@ -162,7 +162,7 @@ trait Producer { self: Actor =>
*/
class ProducerResponseSender(
headers: Map[String, Any],
replyTo : Option[Either[Actor,CompletableFuture]],
replyTo : Option[Either[Actor,CompletableFuture[Any]]],
producer: Actor) extends Synchronization with Logging {
implicit val producerActor = Some(producer) // the response sender

View file

@ -4,6 +4,7 @@
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.config.FaultHandlingStrategy
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future}
@ -28,6 +29,44 @@ object Annotations {
val inittransactionalstate = classOf[inittransactionalstate]
}
/**
* Configuration factory for Active Objects.
*
* FIXDOC: document ActiveObjectConfiguration
*/
final class ActiveObjectConfiguration {
private[akka] var _timeout: Long = Actor.TIMEOUT
private[akka] var _restartCallbacks: Option[RestartCallbacks] = None
private[akka] var _transactionRequired = false
private[akka] var _host: Option[InetSocketAddress] = None
private[akka] var _messageDispatcher: Option[MessageDispatcher] = None
def timeout(timeout: Long) : ActiveObjectConfiguration = {
_timeout = timeout
this
}
def restartCallbacks(pre: String, post: String) : ActiveObjectConfiguration = {
_restartCallbacks = Some(new RestartCallbacks(pre, post))
this
}
def makeTransactionRequired() : ActiveObjectConfiguration = {
_transactionRequired = true;
this
}
def makeRemote(hostname: String, port: Int) : ActiveObjectConfiguration = {
_host = Some(new InetSocketAddress(hostname, port))
this
}
def dispatcher(messageDispatcher: MessageDispatcher) : ActiveObjectConfiguration = {
_messageDispatcher = Some(messageDispatcher)
this
}
}
/**
* Factory class for creating Active Objects out of plain POJOs and/or POJOs with interfaces.
*
@ -35,146 +74,196 @@ object Annotations {
*/
object ActiveObject {
val AKKA_CAMEL_ROUTING_SCHEME = "akka"
private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
def newInstance[T](target: Class[T], timeout: Long): T =
newInstance(target, new Dispatcher(false, None), None, timeout)
def newInstance[T](target: Class[T]): T =
newInstance(target, new Dispatcher(false, None), None, Actor.TIMEOUT)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T =
newInstance(intf, target, new Dispatcher(false, None), None, timeout)
def newInstance[T](intf: Class[T], target: AnyRef): T =
newInstance(intf, target, new Dispatcher(false, None), None, Actor.TIMEOUT)
def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T =
newInstance(target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](target: Class[T], hostname: String, port: Int): T =
newInstance(target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT)
def newInstance[T](target: Class[T], config: ActiveObjectConfiguration): T = {
val actor = new Dispatcher(config._transactionRequired, config._restartCallbacks)
if (config._messageDispatcher.isDefined) {
actor.messageDispatcher = config._messageDispatcher.get
}
newInstance(target, actor, config._host, config._timeout)
}
def newInstance[T](intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration): T = {
val actor = new Dispatcher(config._transactionRequired, config._restartCallbacks)
if (config._messageDispatcher.isDefined) {
actor.messageDispatcher = config._messageDispatcher.get
}
newInstance(intf, target, actor, config._host, config._timeout)
}
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newInstance[T](target: Class[T], timeout: Long, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(target, new Dispatcher(false, restartCallbacks), None, timeout)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T =
newInstance(intf, target, new Dispatcher(false, None), None, timeout)
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(intf, target, new Dispatcher(false, restartCallbacks), None, timeout)
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean): T =
newInstance(target, new Dispatcher(transactionRequired, None), None, timeout)
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(target, new Dispatcher(transactionRequired, restartCallbacks), None, timeout)
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean): T =
newInstance(intf, target, new Dispatcher(transactionRequired, None), None, timeout)
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(intf, target, new Dispatcher(transactionRequired, restartCallbacks), None, timeout)
def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T =
newInstance(target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(target, new Dispatcher(false, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T =
newInstance(intf, target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout)
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(intf, target, new Dispatcher(false, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, hostname: String, port: Int): T =
newInstance(target, new Dispatcher(transactionRequired, None), Some(new InetSocketAddress(hostname, port)), timeout)
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(target, new Dispatcher(transactionRequired, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, hostname: String, port: Int): T =
newInstance(intf, target, new Dispatcher(transactionRequired, None), Some(new InetSocketAddress(hostname, port)), timeout)
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T =
newInstance(intf, target, new Dispatcher(transactionRequired, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = {
val actor = new Dispatcher(false, None)
actor.messageDispatcher = dispatcher
newInstance(target, actor, None, timeout)
}
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(false, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(target, actor, None, timeout)
}
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = {
val actor = new Dispatcher(false, None)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, None, timeout)
}
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(false, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, None, timeout)
}
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher): T = {
val actor = new Dispatcher(transactionRequired, None)
actor.messageDispatcher = dispatcher
newInstance(target, actor, None, timeout)
}
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(transactionRequired, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(target, actor, None, timeout)
}
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher): T = {
val actor = new Dispatcher(transactionRequired, None)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, None, timeout)
}
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(transactionRequired, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, None, timeout)
}
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher(false, None)
actor.messageDispatcher = dispatcher
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(false, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher(false, None)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(false, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher(transactionRequired, None)
actor.messageDispatcher = dispatcher
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(transactionRequired, restartCallbacks)
actor.messageDispatcher = dispatcher
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher(transactionRequired, None)
actor.messageDispatcher = dispatcher
newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
@deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead")
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
val actor = new Dispatcher(transactionRequired, restartCallbacks)
actor.messageDispatcher = dispatcher
@ -182,35 +271,94 @@ object ActiveObject {
}
private[akka] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val proxy = Proxy.newInstance(target, false, true)
val proxy = Proxy.newInstance(target, false, false)
actor.initialize(target, proxy)
actor.timeout = timeout
if (remoteAddress.isDefined) {
actor.makeRemote(remoteAddress.get)
}
AspectInitRegistry.register(proxy, AspectInit(target, actor, remoteAddress, timeout))
actor.start
proxy.asInstanceOf[T]
}
private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, false)
actor.initialize(target.getClass, target)
actor.timeout = timeout
if (remoteAddress.isDefined) {
actor.makeRemote(remoteAddress.get)
}
AspectInitRegistry.register(proxy, AspectInit(intf, actor, remoteAddress, timeout))
actor.start
proxy.asInstanceOf[T]
}
// Jan Kronquist: started work on issue 121
// def actorFor(obj: AnyRef): Option[Actor] = {
// ActorRegistry.actorsFor(classOf[Dispatcher]).find(a=>a.target == Some(obj))
// }
//
// def link(supervisor: AnyRef, activeObject: AnyRef) = {
// actorFor(supervisor).get !! Link(actorFor(activeObject).get)
// }
//
// def unlink(supervisor: AnyRef, activeObject: AnyRef) = {
// actorFor(supervisor).get !! Unlink(actorFor(activeObject).get)
// }
/**
* Get the underlying dispatcher actor for the given active object.
*/
def actorFor(obj: AnyRef): Option[Actor] = {
ActorRegistry.actorsFor(classOf[Dispatcher]).find(a=>a.target == Some(obj))
}
/**
* Links an other active object to this active object.
* @param supervisor the supervisor active object
* @param supervised the active object to link
*/
def link(supervisor: AnyRef, supervised: AnyRef) = {
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object"))
val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object"))
supervisorActor !! Link(supervisedActor)
}
/**
* Links an other active object to this active object and sets the fault handling for the supervisor.
* @param supervisor the supervisor active object
* @param supervised the active object to link
* @param handler fault handling strategy
* @param trapExceptions array of exceptions that should be handled by the supervisor
*/
def link(supervisor: AnyRef, supervised: AnyRef, handler: FaultHandlingStrategy, trapExceptions: Array[Class[_ <: Throwable]]) = {
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object"))
val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object"))
supervisorActor.trapExit = trapExceptions.toList
supervisorActor.faultHandler = Some(handler)
supervisorActor !! Link(supervisedActor)
}
/**
* Unlink the supervised active object from the supervisor.
* @param supervisor the supervisor active object
* @param supervised the active object to unlink
*/
def unlink(supervisor: AnyRef, supervised: AnyRef) = {
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't unlink when the supervisor is not an active object"))
val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't unlink when the supervised is not an active object"))
supervisorActor !! Unlink(supervisedActor)
}
/**
* Sets the trap exit for the given supervisor active object.
* @param supervisor the supervisor active object
* @param trapExceptions array of exceptions that should be handled by the supervisor
*/
def trapExit(supervisor: AnyRef, trapExceptions: Array[Class[_ <: Throwable]]) = {
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set trap exceptions when the supervisor is not an active object"))
supervisorActor.trapExit = trapExceptions.toList
this
}
/**
* Sets the fault handling strategy for the given supervisor active object.
* @param supervisor the supervisor active object
* @param handler fault handling strategy
*/
def faultHandler(supervisor: AnyRef, handler: FaultHandlingStrategy) = {
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set fault handler when the supervisor is not an active object"))
supervisorActor.faultHandler = Some(handler)
this
}
private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = {
val factory = SupervisorFactory(SupervisorConfig(restartStrategy, components))
@ -227,19 +375,19 @@ private[akka] object AspectInitRegistry {
val init = initializations.get(target)
initializations.remove(target)
init
}
}
def register(target: AnyRef, init: AspectInit) = initializations.put(target, init)
}
private[akka] sealed case class AspectInit(
val target: Class[_],
val actor: Dispatcher,
val actor: Dispatcher,
val remoteAddress: Option[InetSocketAddress],
val timeout: Long) {
def this(target: Class[_],actor: Dispatcher, timeout: Long) = this(target, actor, None, timeout)
}
/**
* AspectWerkz Aspect that is turning POJOs into Active Object.
* Is deployed on a 'per-instance' basis.
@ -260,7 +408,7 @@ private[akka] sealed class ActiveObjectAspect {
if (!isInitialized) {
val init = AspectInitRegistry.initFor(joinPoint.getThis)
target = init.target
actor = init.actor
actor = init.actor
remoteAddress = init.remoteAddress
timeout = init.timeout
isInitialized = true
@ -279,7 +427,7 @@ private[akka] sealed class ActiveObjectAspect {
(actor ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef]
}
else {
val result = actor !! Invocation(joinPoint, false, isVoid(rtti))
val result = actor !! (Invocation(joinPoint, false, isVoid(rtti)), timeout)
if (result.isDefined) result.get
else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]")
}
@ -314,12 +462,12 @@ private[akka] sealed class ActiveObjectAspect {
}
}
private def getResultOrThrowException[T](future: Future): Option[T] =
private def getResultOrThrowException[T](future: Future[T]): Option[T] =
if (future.exception.isDefined) {
val (_, cause) = future.exception.get
throw cause
} else future.result.asInstanceOf[Option[T]]
} else future.result
private def isOneWay(rtti: MethodRtti) = rtti.getMethod.isAnnotationPresent(Annotations.oneway)
private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE
@ -366,11 +514,11 @@ private[akka] sealed class ActiveObjectAspect {
}
// Jan Kronquist: started work on issue 121
// private[akka] case class Link(val actor: Actor)
private[akka] case class Link(val actor: Actor)
object Dispatcher {
val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]()
val ZERO_ITEM_OBJECT_ARRAY = Array[Object]()
val ZERO_ITEM_OBJECT_ARRAY = Array[Object]()
}
/**
@ -408,7 +556,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
"Could not find post restart method [" + post + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") })
}
// See if we have any annotation defined restart callbacks
// See if we have any annotation defined restart callbacks
if (!preRestart.isDefined) preRestart = methods.find(m => m.isAnnotationPresent(Annotations.prerestart))
if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart))
@ -421,7 +569,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
if (preRestart.isDefined) preRestart.get.setAccessible(true)
if (postRestart.isDefined) postRestart.get.setAccessible(true)
// see if we have a method annotated with @inittransactionalstate, if so invoke it
initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate))
if (initTxState.isDefined && initTxState.get.getParameterTypes.length != 0) throw new IllegalStateException("Method annotated with @inittransactionalstate must have a zero argument definition")
@ -434,8 +582,8 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
if (isOneWay) joinPoint.proceed
else reply(joinPoint.proceed)
// Jan Kronquist: started work on issue 121
// case Link(target) =>
// link(target)
case Link(target) => link(target)
case Unlink(target) => unlink(target)
case unexpected =>
throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
}
@ -486,6 +634,6 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
if (!unserializable && hasMutableArgument) {
val copyOfArgs = Serializer.Java.deepClone(args)
joinPoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]])
}
}
}
}

View file

@ -52,6 +52,7 @@ case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMe
case class Restart(reason: Throwable) extends LifeCycleMessage
case class Exit(dead: Actor, killer: Throwable) extends LifeCycleMessage
case class Unlink(child: Actor) extends LifeCycleMessage
case class UnlinkAndStop(child: Actor) extends LifeCycleMessage
case object Kill extends LifeCycleMessage
class ActorKilledException private[akka](message: String) extends RuntimeException(message)
@ -187,6 +188,43 @@ final class ActorRef private (val actor: Actor) {
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
=======
}
/**
* Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
* <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>
* <p/>
* An actor has a well-defined (non-cyclic) life-cycle.
* <pre>
* => NEW (newly created actor) - can't receive messages (yet)
* => STARTED (when 'start' is invoked) - can receive messages
* => SHUT DOWN (when 'exit' is invoked) - can't do anything
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Actor extends TransactionManagement with Logging {
implicit protected val self: Some[Actor] = Some(this)
// Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = UUID.newUuid.toString
// ====================================
// private fields
// ====================================
@volatile private[this] var _isRunning = false
@volatile private[this] var _isSuspended = true
@volatile private[this] var _isShutDown = false
@volatile private[akka] var _isKilled = false
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
private[akka] var _linkedActors: Option[HashSet[Actor]] = None
private[akka] var _supervisor: Option[Actor] = None
private[akka] var _replyToAddress: Option[InetSocketAddress] = None
private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
/**
* Sends a message asynchronously and waits on a future for a reply message.
@ -205,6 +243,7 @@ final class ActorRef private (val actor: Actor) {
def !![T](message: Any): Option[T] = !![T](message, timeout)
/**
<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
* FIXME document !!!
*/
def !!!(message: Any): Future = {
@ -214,6 +253,18 @@ final class ActorRef private (val actor: Actor) {
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
=======
* Holds the reference to the sender of the currently processed message.
* Is None if no sender was specified
* Is Some(Left(Actor)) if sender is an actor
* Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result
*/
protected var replyTo: Option[Either[Actor,CompletableFuture[Any]]] = None
// ====================================
// ==== USER CALLBACKS TO OVERRIDE ====
// ====================================
>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
/**
* Forwards the message and passes the original sender actor as the sender.
@ -253,11 +304,16 @@ final class ActorRef private (val actor: Actor) {
/**
* Get the dispatcher for this actor.
*/
<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
def dispatcher: MessageDispatcher = messageDispatcher
=======
protected[akka] var messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
def dispatcher_=(md: MessageDispatcher): Unit = synchronized {
if (!_isRunning) {
messageDispatcher.unregister(this)
@ -267,13 +323,20 @@ final class ActorRef private (val actor: Actor) {
} else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
=======
protected[akka] var trapExit: List[Class[_ <: Throwable]] = Nil
>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
def makeRemote(hostname: String, port: Int): Unit =
if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
else makeRemote(new InetSocketAddress(hostname, port))
=======
protected[akka] var faultHandler: Option[FaultHandlingStrategy] = None
>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
@ -481,10 +544,32 @@ object Actor extends Logging {
* }
* </pre>
*/
<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
def transactor(body: PartialFunction[Any, Unit]): Actor = new Transactor() {
lifeCycle = Some(LifeCycle(Permanent))
start
def receive: PartialFunction[Any, Unit] = body
=======
def !![T](message: Any, timeout: Long): Option[T] = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None)
val isActiveObject = message.isInstanceOf[Invocation]
if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
try {
future.await
} catch {
case e: FutureTimeoutException =>
if (isActiveObject) throw e
else None
}
if (future.exception.isDefined) throw future.exception.get._2
else future.result
}
else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
}
/**
@ -551,6 +636,7 @@ object Actor extends Logging {
* }
* </pre>
*/
<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
def spawn(body: => Unit): Unit = {
case object Spawn
new Actor() {
@ -560,6 +646,14 @@ object Actor extends Logging {
case Spawn => body; stop
}
}
=======
def !!![T](message: Any): Future[T] = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) {
postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None)
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
}
}
@ -581,6 +675,7 @@ trait Actor extends TransactionManagement with Logging {
implicit protected val self: Option[Actor] = Some(this)
// Only mutable for RemoteServer in order to maintain identity across nodes
<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
private[akka] var ref: Option[ActorRef] = None
/**
@ -593,6 +688,57 @@ trait Actor extends TransactionManagement with Logging {
// protected fields
// ====================================
=======
/**
* Forwards the message and passes the original sender actor as the sender.
* <p/>
* Works with both '!' and '!!'.
*/
def forward(message: Any)(implicit sender: Some[Actor]) = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) {
sender.get.replyTo match {
case Some(Left(actor)) => postMessageToMailbox(message, Some(actor))
case Some(Right(future)) => postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, Some(future))
case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor")
}
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
* Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* Throws an IllegalStateException if unable to determine what to reply to
*/
protected[this] def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably used the '!' method to either; " +
"\n\t\t1. Send a message to a remote actor which does not have a contact address." +
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
"\n\tthat will be bound by the argument passed to 'reply'." +
"\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
* being processed.
* Returns true if reply was sent, and false if unable to determine what to reply to.
*/
protected[this] def reply_?(message: Any) : Boolean = replyTo match {
case Some(Left(actor)) =>
actor ! message
true
case Some(Right(future : Future[Any])) =>
future completeWithResult message
true
case _ =>
false
}
>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
/**
* TODO: Document replyTo
*/
@ -613,7 +759,18 @@ trait Actor extends TransactionManagement with Logging {
* use a custom name to be able to retrieve the "correct" persisted state
* upon restart, remote restart etc.
*/
<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
protected var id: String = this.getClass.getName
=======
def dispatcher_=(md: MessageDispatcher): Unit = synchronized {
if (!_isRunning) {
messageDispatcher.unregister(this)
messageDispatcher = md
messageDispatcher.register(this)
} else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
/**
* User overridable callback/setting.
@ -735,7 +892,16 @@ trait Actor extends TransactionManagement with Logging {
* Mandatory callback method that is called during restart and reinitialization after a server crash.
* To be implemented by subclassing actor.
*/
<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
protected def postRestart(reason: Throwable) {}
=======
protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): T = {
val actor = spawnButDoNotStart[T]
actor.makeRemote(hostname, port)
actor.start
actor
}
>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
/**
* User overridable callback/setting.
@ -743,7 +909,19 @@ trait Actor extends TransactionManagement with Logging {
* Optional callback method that is called during termination.
* To be implemented by subclassing actor.
*/
<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
protected def initTransactionalState {}
=======
protected[this] def spawnLink[T <: Actor: Manifest]: T = {
val actor = spawnButDoNotStart[T]
try {
actor.start
} finally {
link(actor)
}
actor
}
>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
/**
* User overridable callback/setting.
@ -816,10 +994,10 @@ trait Actor extends TransactionManagement with Logging {
RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.getId, sender.get)
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)
RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None)
} else {
val invocation = new MessageInvocation(this, message, sender.map(Left(_)), transactionSet.get)
if (_isEventBased) {
if (messageDispatcher.usesActorMailbox) {
_mailbox.add(invocation)
if (_isSuspended) invocation.send
}
@ -827,10 +1005,10 @@ trait Actor extends TransactionManagement with Logging {
}
}
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
senderFuture: Option[CompletableFuture]): CompletableFuture = {
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
joinTransaction(message)
if (_remoteAddress.isDefined) {
@ -850,12 +1028,13 @@ trait Actor extends TransactionManagement with Logging {
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else {
val future = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture(timeout)
val invocation = new MessageInvocation(this, message, Some(Right(future)), transactionSet.get)
if (_isEventBased) {
else new DefaultCompletableFuture[T](timeout)
val invocation = new MessageInvocation(this, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get)
if (messageDispatcher.usesActorMailbox)
_mailbox.add(invocation)
invocation.send
} else invocation.send
invocation.send
future
}
}
@ -961,18 +1140,15 @@ trait Actor extends TransactionManagement with Logging {
}
}
private def getResultOrThrowException[T](future: Future): Option[T] =
if (future.exception.isDefined) throw future.exception.get._2
else future.result.asInstanceOf[Option[T]]
private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
private val lifeCycles: PartialFunction[Any, Unit] = {
case HotSwap(code) => _hotswap = code
case Restart(reason) => restart(reason)
case Exit(dead, reason) => handleTrapExit(dead, reason)
case Unlink(child) => unlink(child); child.stop
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
case HotSwap(code) => _hotswap = code
case Restart(reason) => restart(reason)
case Exit(dead, reason) => handleTrapExit(dead, reason)
case Unlink(child) => unlink(child)
case UnlinkAndStop(child) => unlink(child); child.stop
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
@ -1005,7 +1181,7 @@ trait Actor extends TransactionManagement with Logging {
// if last temporary actor is gone, then unlink me from supervisor
if (getLinkedActors.isEmpty) {
Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)\n\tshutting down and unlinking supervisor actor as well [%s].", actor.id)
_supervisor.foreach(_ ! Unlink(this))
_supervisor.foreach(_ ! UnlinkAndStop(this))
}
}
}

View file

@ -12,7 +12,7 @@ import net.lag.configgy.{Configgy, ParseException}
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Config extends Logging {
val VERSION = "0.8.1"
val VERSION = "0.9"
// Set Multiverse options for max speed
System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false")

View file

@ -11,7 +11,7 @@ import se.scalablesolutions.akka.actor.Actor
* <p/>
* Example usage:
* <pre/>
* val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
* val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name")
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
@ -25,7 +25,7 @@ import se.scalablesolutions.akka.actor.Actor
* <p/>
* Example usage:
* <pre/>
* MessageDispatcher dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name");
* MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name");
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
@ -40,9 +40,8 @@ import se.scalablesolutions.akka.actor.Actor
*/
object Dispatchers {
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
override def register(actor : Actor) = {
if (isShutdown)
init
override def register(actor: Actor) = {
if (isShutdown) init
super.register(actor)
}
}

View file

@ -94,6 +94,8 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
active = false
references.clear
}
def usesActorMailbox = true
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")

View file

@ -199,6 +199,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
pooledActors.remove(actor)
super.unregister(actor)
}
def usesActorMailbox = true
private def verifyActorsAreOfSameType(newActor: Actor) = {
actorType match {

View file

@ -20,8 +20,8 @@ object Futures {
* }
* </pre>
*/
def future(timeout: Long)(body: => Any): Future = {
val promise = new DefaultCompletableFuture(timeout)
def future[T](timeout: Long)(body: => T): Future[T] = {
val promise = new DefaultCompletableFuture[T](timeout)
try {
promise completeWithResult body
} catch {
@ -30,10 +30,10 @@ object Futures {
promise
}
def awaitAll(futures: List[Future]): Unit = futures.foreach(_.await)
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
def awaitOne(futures: List[Future]): Future = {
var future: Option[Future] = None
def awaitOne(futures: List[Future[_]]): Future[_] = {
var future: Option[Future[_]] = None
do {
future = futures.find(_.isCompleted)
} while (future.isEmpty)
@ -41,12 +41,12 @@ object Futures {
}
/*
def awaitEither(f1: Future, f2: Future): Option[Any] = {
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = {
import Actor.Sender.Self
import Actor.{spawn, actor}
case class Result(res: Option[Any])
val handOff = new SynchronousQueue[Option[Any]]
case class Result(res: Option[T])
val handOff = new SynchronousQueue[Option[T]]
spawn {
try {
println("f1 await")
@ -70,23 +70,23 @@ object Futures {
*/
}
sealed trait Future {
def await
def awaitBlocking
sealed trait Future[T] {
def await : Future[T]
def awaitBlocking : Future[T]
def isCompleted: Boolean
def isExpired: Boolean
def timeoutInNanos: Long
def result: Option[Any]
def result: Option[T]
def exception: Option[Tuple2[AnyRef, Throwable]]
}
trait CompletableFuture extends Future {
def completeWithResult(result: Any)
trait CompletableFuture[T] extends Future[T] {
def completeWithResult(result: T)
def completeWithException(toBlame: AnyRef, exception: Throwable)
}
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
private val TIME_UNIT = TimeUnit.MILLISECONDS
def this() = this(0)
@ -95,7 +95,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
private val _lock = new ReentrantLock
private val _signal = _lock.newCondition
private var _completed: Boolean = _
private var _result: Option[Any] = None
private var _result: Option[T] = None
private var _exception: Option[Tuple2[AnyRef, Throwable]] = None
def await = try {
@ -111,6 +111,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
wait = wait - (currentTimeInNanos - start)
}
}
this
} finally {
_lock.unlock
}
@ -120,6 +121,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
while (!_completed) {
_signal.await
}
this
} finally {
_lock.unlock
}
@ -138,7 +140,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
_lock.unlock
}
def result: Option[Any] = try {
def result: Option[T] = try {
_lock.lock
_result
} finally {
@ -152,7 +154,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
_lock.unlock
}
def completeWithResult(result: Any) = try {
def completeWithResult(result: T) = try {
_lock.lock
if (!_completed) {
_completed = true

View file

@ -15,7 +15,7 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier
final class MessageInvocation(val receiver: Actor,
val message: Any,
val replyTo : Option[Either[Actor,CompletableFuture]],
val replyTo : Option[Either[Actor,CompletableFuture[Any]]],
val transactionSet: Option[CountDownCommitBarrier]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null")
@ -68,6 +68,7 @@ trait MessageDispatcher extends Logging {
}
def canBeShutDown: Boolean = references.isEmpty
def isShutdown: Boolean
def usesActorMailbox : Boolean
}
trait MessageDemultiplexer {

View file

@ -37,6 +37,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(name: String) extends Abstra
}
def isShutdown = !active
def usesActorMailbox = false
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {

View file

@ -134,6 +134,8 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
if (fair) true
else nrOfBusyMessages < 100
}
def usesActorMailbox = false
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")

View file

@ -41,6 +41,8 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler:
def isShutdown = !active
def usesActorMailbox = false
def shutdown = if (active) {
log.debug("Shutting down ThreadBasedDispatcher [%s]", name)
active = false

View file

@ -85,13 +85,13 @@ object RemoteClient extends Logging {
requestBuilder.setSourcePort(port)
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
remoteClient.send(requestBuilder.build, None)
remoteClient.send[Any](requestBuilder.build, None)
}
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
override def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
senderFuture: Option[CompletableFuture]): CompletableFuture = {
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
@ -173,7 +173,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port
@volatile private[remote] var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFuture]
private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]]
private val supervisors = new ConcurrentHashMap[String, Actor]
private[remote] val listeners = new ConcurrentSkipListSet[Actor]
@ -217,14 +217,14 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
}
}
def send(request: RemoteRequest, senderFuture: Option[CompletableFuture]): Option[CompletableFuture] = if (isRunning) {
def send[T](request: RemoteRequest, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) {
if (request.getIsOneWay) {
connection.getChannel.write(request)
None
} else {
futures.synchronized {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture(request.getTimeout)
else new DefaultCompletableFuture[T](request.getTimeout)
futures.put(request.getId, futureResult)
connection.getChannel.write(request)
Some(futureResult)
@ -253,7 +253,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteClientPipelineFactory(name: String,
futures: ConcurrentMap[Long, CompletableFuture],
futures: ConcurrentMap[Long, CompletableFuture[_]],
supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap,
remoteAddress: SocketAddress,
@ -284,7 +284,7 @@ class RemoteClientPipelineFactory(name: String,
*/
@ChannelHandler.Sharable
class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFuture],
val futures: ConcurrentMap[Long, CompletableFuture[_]],
val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress,
@ -306,7 +306,7 @@ class RemoteClientHandler(val name: String,
if (result.isInstanceOf[RemoteReply]) {
val reply = result.asInstanceOf[RemoteReply]
log.debug("Remote client received RemoteReply[\n%s]", reply.toString)
val future = futures.get(reply.getId)
val future = futures.get(reply.getId).asInstanceOf[CompletableFuture[Any]]
if (reply.getIsSuccessful) {
val message = RemoteProtocolBuilder.getMessage(reply)
future.completeWithResult(message)

View file

@ -80,7 +80,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT
start
private var readerFuture: Option[CompletableFuture] = None
private var readerFuture: Option[CompletableFuture[T]] = None
def receive = {
case Get =>
val ref = dataFlow.value.get
@ -88,11 +88,11 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
reply(ref.get)
else {
readerFuture = replyTo match {
case Some(Right(future)) => Some(future)
case Some(Right(future)) => Some(future.asInstanceOf[CompletableFuture[T]])
case _ => None
}
}
case Set(v) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v)
case Set(v:T) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v)
case Exit => exit
}
}

View file

@ -0,0 +1,202 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
import javax.transaction.{TransactionManager, UserTransaction, Transaction => JtaTransaction, SystemException, Status, Synchronization, TransactionSynchronizationRegistry}
import javax.naming.{InitialContext, Context, NamingException}
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.util.Logging
/**
* Detects if there is a UserTransaction or TransactionManager available in the JNDI.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object TransactionContainer extends Logging {
val AKKA_JTA_TRANSACTION_SERVICE_CLASS = "se.scalablesolutions.akka.jta.AtomikosTransactionService"
val DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction"
val FALLBACK_TRANSACTION_MANAGER_NAMES = "java:comp/TransactionManager" ::
"java:appserver/TransactionManager" ::
"java:pm/TransactionManager" ::
"java:/TransactionManager" :: Nil
val DEFAULT_TRANSACTION_SYNCHRONIZATION_REGISTRY_NAME = "java:comp/TransactionSynchronizationRegistry"
val JTA_PROVIDER = config.getString("akka.jta.provider", "from-jndi")
private var synchronizationRegistry: Option[TransactionSynchronizationRegistry] = None
def apply(tm: Either[Option[UserTransaction], Option[TransactionManager]]) = new TransactionContainer(tm)
def apply(): TransactionContainer =
JTA_PROVIDER match {
case "from-jndi" =>
new TransactionContainer(findUserTransaction match {
case None => Right(findTransactionManager)
case tm => Left(tm)
})
case "atomikos" =>
try {
Class.forName(AKKA_JTA_TRANSACTION_SERVICE_CLASS)
.newInstance.asInstanceOf[TransactionService]
.transactionContainer
} catch {
case e: ClassNotFoundException =>
throw new StmConfigurationException(
"JTA provider defined as 'atomikos', but the AtomikosTransactionService classes can not be found." +
"\n\tPlease make sure you have 'akka-jta' JAR and its dependencies on your classpath.")
}
case _ =>
throw new StmConfigurationException(
"No UserTransaction on TransactionManager could be found in scope." +
"\n\tEither add 'akka-jta' to the classpath or make sure there is a" +
"\n\tTransactionManager or UserTransaction defined in the JNDI.")
}
def findUserTransaction: Option[UserTransaction] = {
val located = createInitialContext.lookup(DEFAULT_USER_TRANSACTION_NAME)
if (located eq null) None
else {
log.info("JTA UserTransaction detected [%s]", located)
Some(located.asInstanceOf[UserTransaction])
}
}
def findSynchronizationRegistry: Option[TransactionSynchronizationRegistry] = synchronized {
if (synchronizationRegistry.isDefined) synchronizationRegistry
else {
val located = createInitialContext.lookup(DEFAULT_TRANSACTION_SYNCHRONIZATION_REGISTRY_NAME)
if (located eq null) None
else {
log.info("JTA TransactionSynchronizationRegistry detected [%s]", located)
synchronizationRegistry = Some(located.asInstanceOf[TransactionSynchronizationRegistry])
synchronizationRegistry
}
}
}
def findTransactionManager: Option[TransactionManager] = {
val context = createInitialContext
val tms = for {
name <- FALLBACK_TRANSACTION_MANAGER_NAMES
tm = context.lookup(name)
if tm ne null
} yield tm
tms match {
case Nil => None
case tm :: _ =>
log.info("JTA TransactionManager detected [%s]", tm)
Some(tm.asInstanceOf[TransactionManager])
}
}
private def createInitialContext = new InitialContext(new java.util.Hashtable)
}
/**
* JTA transaction container holding either a UserTransaction or a TransactionManager.
* <p/>
* The TransactionContainer is created using the factory <tt>val container = TransactionContainer()</tt>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionContainer private (val tm: Either[Option[UserTransaction], Option[TransactionManager]]) {
def registerSynchronization(sync: Synchronization) = {
TransactionContainer.findSynchronizationRegistry match { // try to use SynchronizationRegistry in JNDI
case Some(registry) =>
registry.asInstanceOf[TransactionSynchronizationRegistry].registerInterposedSynchronization(sync)
case None =>
tm match {
case Right(Some(txMan)) => // try to use TransactionManager
txMan.getTransaction.registerSynchronization(sync)
case _ =>
log.warning("Cannot find TransactionSynchronizationRegistry in JNDI, can't register STM synchronization")
}
}
}
def begin = tm match {
case Left(Some(userTx)) => userTx.begin
case Right(Some(txMan)) => txMan.begin
case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope")
}
def commit = tm match {
case Left(Some(userTx)) => userTx.commit
case Right(Some(txMan)) => txMan.commit
case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope")
}
def rollback = tm match {
case Left(Some(userTx)) => userTx.rollback
case Right(Some(txMan)) => txMan.rollback
case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope")
}
def getStatus = tm match {
case Left(Some(userTx)) => userTx.getStatus
case Right(Some(txMan)) => txMan.getStatus
case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope")
}
def isInExistingTransaction = tm match {
case Left(Some(userTx)) => userTx.getStatus == Status.STATUS_ACTIVE
case Right(Some(txMan)) => txMan.getStatus == Status.STATUS_ACTIVE
case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope")
}
def isRollbackOnly = tm match {
case Left(Some(userTx)) => userTx.getStatus == Status.STATUS_MARKED_ROLLBACK
case Right(Some(txMan)) => txMan.getStatus == Status.STATUS_MARKED_ROLLBACK
case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope")
}
def setRollbackOnly = tm match {
case Left(Some(userTx)) => userTx.setRollbackOnly
case Right(Some(txMan)) => txMan.setRollbackOnly
case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope")
}
def suspend = tm match {
case Right(Some(txMan)) => txMan.suspend
case _ => throw new StmConfigurationException("Does not have a TransactionManager in scope")
}
def resume(tx: JtaTransaction) = tm match {
case Right(Some(txMan)) => txMan.resume(tx)
case _ => throw new StmConfigurationException("Does not have a TransactionManager in scope")
}
}
/**
* STM Synchronization class for synchronizing with the JTA TransactionManager.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class StmSynchronization(tc: TransactionContainer, tx: Transaction) extends Synchronization with Logging {
def beforeCompletion = {
val status = tc.getStatus
if (status != Status.STATUS_ROLLEDBACK &&
status != Status.STATUS_ROLLING_BACK &&
status != Status.STATUS_MARKED_ROLLBACK) {
log.debug("JTA transaction has failed, abort STM transaction")
tx.transaction.foreach(_.abort) // abort multiverse tx
}
}
def afterCompletion(status: Int) = {}
}
/**
* JTA Transaction service.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait TransactionService {
def transactionContainer: TransactionContainer
}

View file

@ -8,9 +8,12 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.TimeUnit
import javax.transaction.{TransactionManager, UserTransaction, Status, TransactionSynchronizationRegistry}
import scala.collection.mutable.HashMap
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config._
import org.multiverse.api.{Transaction => MultiverseTransaction, TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
@ -21,6 +24,26 @@ import org.multiverse.stms.alpha.AlphaStm
class NoTransactionInScopeException extends RuntimeException
class TransactionRetryException(message: String) extends RuntimeException(message)
class StmConfigurationException(message: String) extends RuntimeException(message)
/**
* FIXDOC: document AtomicTemplate
* AtomicTemplate can be used to create atomic blocks from Java code.
* <pre>
* User newUser = new AtomicTemplate[User]() {
* User atomic() {
* ... // create user atomically
* return user;
* }
* }.execute();
* </pre>
*/
trait AtomicTemplate[T] {
def atomic: T
def execute: T = Transaction.Local.atomic {
atomic
}
}
object Transaction {
val idFactory = new AtomicLong(-1L)
@ -253,9 +276,9 @@ object Transaction {
createNewTransactionSet
} else getTransactionSetInScope
val tx = new Transaction
tx.begin
tx.transaction = Some(mtx)
setTransaction(Some(tx))
txSet.registerOnCommitTask(new Runnable() {
def run = tx.commit
})
@ -269,31 +292,47 @@ object Transaction {
}
/**
* The Akka specific Transaction class, keeping track of persistent data structures (as in on-disc).
* The Akka specific Transaction class, keeping track of persistent data structures (as in on-disc)
* and JTA support.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable class Transaction extends Logging {
val JTA_AWARE = config.getBool("akka.stm.jta-aware", false)
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[akka] var transaction: Option[MultiverseTransaction] = None
private[this] val persistentStateMap = new HashMap[String, Committable]
private[akka] val depth = new AtomicInteger(0)
val jta: Option[TransactionContainer] =
if (JTA_AWARE) Some(TransactionContainer())
else None
log.trace("Creating %s", toString)
// --- public methods ---------
def begin = synchronized {
jta.foreach { txContainer =>
txContainer.begin
txContainer.registerSynchronization(new StmSynchronization(txContainer, this))
}
}
def commit = synchronized {
log.trace("Committing transaction %s", toString)
Transaction.atomic0 {
persistentStateMap.valuesIterator.foreach(_.commit)
}
status = TransactionStatus.Completed
jta.foreach(_.commit)
}
def abort = synchronized {
log.trace("Aborting transaction %s", toString)
jta.foreach(_.rollback)
}
def isNew = synchronized { status == TransactionStatus.New }
@ -306,6 +345,8 @@ object Transaction {
// --- internal methods ---------
private def isJtaTxActive(status: Int) = status == Status.STATUS_ACTIVE
private[akka] def status_? = status
private[akka] def increment = depth.incrementAndGet
@ -317,17 +358,17 @@ object Transaction {
private[akka] def register(uuid: String, storage: Committable) = persistentStateMap.put(uuid, storage)
private def ensureIsActive = if (status != TransactionStatus.Active)
throw new IllegalStateException(
throw new StmConfigurationException(
"Expected ACTIVE transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrAborted =
if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
throw new IllegalStateException(
throw new StmConfigurationException(
"Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrNew =
if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
throw new IllegalStateException(
throw new StmConfigurationException(
"Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
// For reinitialize transaction after sending it over the wire

View file

@ -40,13 +40,13 @@ object TransactionManagement extends TransactionManagement {
private[akka] def getTransactionSet: CountDownCommitBarrier = {
val option = transactionSet.get
if ((option eq null) || option.isEmpty) throw new IllegalStateException("No Transaction set in scope")
if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction set in scope")
else option.get
}
private[akka] def getTransaction: Transaction = {
val option = transaction.get
if ((option eq null) || option.isEmpty) throw new IllegalStateException("No Transaction in scope")
if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction in scope")
option.get
}
}

View file

@ -62,6 +62,8 @@ trait Committable {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Ref {
type Ref[T] = TransactionalRef[T]
def apply[T]() = new Ref[T]
def apply[T](initialValue: T) = new Ref[T](Some(initialValue))
@ -75,7 +77,7 @@ object Ref {
object TransactionalRef {
/**
* An implicit conversion that converts an Option to an Iterable value.
* An implicit conversion that converts a TransactionalRef to an Iterable value.
*/
implicit def ref2Iterable[T](ref: TransactionalRef[T]): Iterable[T] = ref.toList
@ -84,14 +86,6 @@ object TransactionalRef {
def apply[T](initialValue: T) = new TransactionalRef[T](Some(initialValue))
}
/**
* Implements a transactional managed reference.
* Alias to TransactionalRef.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Ref[T](initialOpt: Option[T] = None) extends TransactionalRef[T](initialOpt)
/**
* Implements a transactional managed reference.
* Alias to Ref.
@ -99,6 +93,8 @@ class Ref[T](initialOpt: Option[T] = None) extends TransactionalRef[T](initialOp
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional {
self =>
import org.multiverse.api.ThreadLocalTransaction._
implicit val txInitName = "TransactionalRef:Init"
@ -149,24 +145,36 @@ class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional {
ref.isNull
}
def map[B](f: T => B): Option[B] = {
def map[B](f: T => B): TransactionalRef[B] = {
ensureIsInTransaction
if (isEmpty) None else Some(f(ref.get))
if (isEmpty) TransactionalRef[B] else TransactionalRef(f(ref.get))
}
def flatMap[B](f: T => Option[B]): Option[B] = {
def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = {
ensureIsInTransaction
if (isEmpty) None else f(ref.get)
if (isEmpty) TransactionalRef[B] else f(ref.get)
}
def filter(p: T => Boolean): Option[T] = {
def filter(p: T => Boolean): TransactionalRef[T] = {
ensureIsInTransaction
if (isEmpty || p(ref.get)) Some(ref.get) else None
if (isDefined && p(ref.get)) TransactionalRef(ref.get) else TransactionalRef[T]
}
def foreach(f: T => Unit) {
/**
* Necessary to keep from being implicitly converted to Iterable in for comprehensions.
*/
def withFilter(p: T => Boolean): WithFilter = new WithFilter(p)
class WithFilter(p: T => Boolean) {
def map[B](f: T => B): TransactionalRef[B] = self filter p map f
def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = self filter p flatMap f
def foreach[U](f: T => U): Unit = self filter p foreach f
def withFilter(q: T => Boolean): WithFilter = new WithFilter(x => p(x) && q(x))
}
def foreach[U](f: T => U): Unit = {
ensureIsInTransaction
if (!isEmpty) f(ref.get)
if (isDefined) f(ref.get)
}
def elements: Iterator[T] = {

View file

@ -326,7 +326,7 @@ object Vector {
@inline
private[stm] def array(elems: AnyRef*) = {
val back = new Array[AnyRef](elems.length)
Array.copy(elems, 0, back, 0, back.length)
Array.copy(elems.toArray, 0, back, 0, back.length)
back
}

View file

@ -57,9 +57,9 @@ class PerformanceSpec extends JUnitSuite {
}
protected def sender : Option[Actor] = replyTo match {
case Some(Left(actor)) => Some(actor)
case _ => None
}
case Some(Left(actor)) => Some(actor)
case _ => None
}
def receive = {
case MeetingCount(i) => {
@ -104,9 +104,9 @@ class PerformanceSpec extends JUnitSuite {
}
protected def sender : Option[Actor] = replyTo match {
case Some(Left(actor)) => Some(actor)
case _ => None
}
case Some(Left(actor)) => Some(actor)
case _ => None
}
override def receive: PartialFunction[Any, Unit] = {
case Meet(from, otherColour) =>

View file

@ -0,0 +1,137 @@
package se.scalablesolutions.akka.stm
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
@RunWith(classOf[JUnitRunner])
class TransactionalRefSpec extends Spec with ShouldMatchers {
describe("A TransactionalRef") {
import Transaction.Local._
it("should optionally accept an initial value") {
val emptyRef = Ref[Int]
val empty = atomic { emptyRef.get }
empty should be(None)
val ref = Ref(3)
val value = atomic { ref.get.get }
value should be(3)
}
it("should be settable using swap") {
val ref = Ref[Int]
atomic { ref.swap(3) }
val value = atomic { ref.get.get }
value should be(3)
}
it("should be changeable using alter") {
val ref = Ref(0)
def increment = atomic {
ref alter (_ + 1)
}
increment
increment
increment
val value = atomic { ref.get.get }
value should be(3)
}
it("should not be changeable using alter if no value has been set") {
val ref = Ref[Int]
def increment = atomic {
ref alter (_ + 1)
}
evaluating { increment } should produce [RuntimeException]
}
it("should be able to be mapped") {
val ref1 = Ref(1)
val ref2 = atomic {
ref1 map (_ + 1)
}
val value1 = atomic { ref1.get.get }
val value2 = atomic { ref2.get.get }
value1 should be(1)
value2 should be(2)
}
it("should be able to be used in a 'foreach' for comprehension") {
val ref = Ref(3)
var result = 0
atomic {
for (value <- ref) {
result += value
}
}
result should be(3)
}
it("should be able to be used in a 'map' for comprehension") {
val ref1 = Ref(1)
val ref2 = atomic {
for (value <- ref1) yield value + 2
}
val value2 = atomic { ref2.get.get }
value2 should be(3)
}
it("should be able to be used in a 'flatMap' for comprehension") {
val ref1 = Ref(1)
val ref2 = Ref(2)
val ref3 = atomic {
for {
value1 <- ref1
value2 <- ref2
} yield value1 + value2
}
val value3 = atomic { ref3.get.get }
value3 should be(3)
}
it("should be able to be used in a 'filter' for comprehension") {
val ref1 = Ref(1)
val refLess2 = atomic {
for (value <- ref1 if value < 2) yield value
}
val optLess2 = atomic { refLess2.get }
val refGreater2 = atomic {
for (value <- ref1 if value > 2) yield value
}
val optGreater2 = atomic { refGreater2.get }
optLess2 should be(Some(1))
optGreater2 should be(None)
}
}
}

View file

@ -0,0 +1,17 @@
package se.scalablesolutions.akka.stm
import org.scalatest.FunSuite
import Transaction.Global._
class TransactionalVectorBugTestSuite extends FunSuite {
test("adding more than 32 items to a Vector shouldn't blow it up") {
atomic {
var v1 = new Vector[Int]()
for (i <- 0 to 31) {
v1 = v1 + i
}
v1 = v1 + 32
}
}
}

View file

@ -0,0 +1,40 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.jta
import javax.transaction.{TransactionManager, SystemException}
import com.atomikos.icatch.jta.{J2eeTransactionManager, J2eeUserTransaction}
import com.atomikos.icatch.config.{TSInitInfo, UserTransactionService, UserTransactionServiceImp}
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.stm.{TransactionService, TransactionContainer}
object AtomikosTransactionService extends AtomikosTransactionService
/**
* Atomikos implementation of the transaction service trait.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class AtomikosTransactionService extends TransactionService with TransactionProtocol {
val JTA_TRANSACTION_TIMEOUT: Int = config.getInt("akka.jta.timeout", 60000) / 1000
private val txService: UserTransactionService = new UserTransactionServiceImp
private val info: TSInitInfo = txService.createTSInitInfo
val transactionContainer: TransactionContainer = TransactionContainer(Right(Some(
try {
txService.init(info)
val tm: TransactionManager = new J2eeTransactionManager
tm.setTransactionTimeout(JTA_TRANSACTION_TIMEOUT)
tm
} catch {
case e => throw new SystemException("Could not create a new Atomikos J2EE Transaction Manager, due to: " + e.toString)
}
)))
// TODO: gracefully shutdown of the TM
//txService.shutdown(false)
}

View file

@ -0,0 +1,239 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.jta
import javax.transaction.{Transaction, Status, TransactionManager, Synchronization}
import se.scalablesolutions.akka.stm.{TransactionService, TransactionContainer}
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config._
/**
* The TransactionContext object manages the transactions.
* Can be used as higher-order functional 'atomic blocks' or monadic.
*
* Manages a thread-local stack of TransactionContexts.
* <p/>
* Example usage 1:
* <pre>
* import TransactionContext._
*
* withTxRequired {
* ... // transactional stuff
* }
* // or
* withTxRequiresNew {
* ... // transactional stuff
* }
* </pre>
* Example usage 2:
* <pre>
* for {
* ctx <- TransactionContext.Required
* entity <- updatedEntities
* if !ctx.isRollbackOnly
* } {
* // transactional stuff
* ...
* }
* </pre>
* Example usage 3:
* <pre>
* val users = for {
* ctx <- TransactionContext.Required
* name <- userNames
* } yield {
* // transactional stuff
* ...
* }
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object TransactionContext extends TransactionProtocol with Logging {
implicit val tc = TransactionContainer()
private[TransactionContext] val stack = new scala.util.DynamicVariable(new TransactionContext(tc))
/**
* This method can be used to register a Synchronization instance for participating with the JTA transaction.
* Here is an example of how to add a JPA EntityManager integration.
* <pre>
* TransactionContext.registerSynchronization(new javax.transaction.Synchronization() {
* def beforeCompletion = {
* try {
* val status = tm.getStatus
* if (status != Status.STATUS_ROLLEDBACK &&
* status != Status.STATUS_ROLLING_BACK &&
* status != Status.STATUS_MARKED_ROLLBACK) {
* log.debug("Flushing EntityManager...")
* em.flush // flush EntityManager on success
* }
* } catch {
* case e: javax.transaction.SystemException => throw new RuntimeException(e)
* }
* }
*
* def afterCompletion(status: Int) = {
* val status = tm.getStatus
* if (closeAtTxCompletion) em.close
* if (status == Status.STATUS_ROLLEDBACK ||
* status == Status.STATUS_ROLLING_BACK ||
* status == Status.STATUS_MARKED_ROLLBACK) {
* em.close
* }
* }
* })
* </pre>
* You should also override the 'joinTransaction' and 'handleException' methods.
* See ScalaDoc for these methods in the 'TransactionProtocol' for details.
*/
def registerSynchronization(sync: Synchronization) = synchronization.add(sync)
/**
* Registeres a join transaction function.
* <p/>
* Here is an example on how to integrate with JPA EntityManager.
*
* <pre>
* TransactionContext.registerJoinTransactionFun(() => {
* val em: EntityManager = ... // get the EntityManager
* em.joinTransaction // join JTA transaction
* })
* </pre>
*/
def registerJoinTransactionFun(fn: () => Unit) = joinTransactionFuns.add(fn)
/**
* Handle exception. Can be overriden by concrete transaction service implementation.
* <p/>
* Here is an example on how to handle JPA exceptions.
*
* <pre>
* TransactionContext.registerExceptionNotToRollbackOn(classOf[NoResultException])
* TransactionContext.registerExceptionNotToRollbackOn(classOf[NonUniqueResultException])
* </pre>
*/
def registerExceptionNotToRollbackOn(e: Class[_ <: Exception]) = exceptionsNotToRollbackOn.add(e)
object Required extends TransactionMonad {
def map[T](f: TransactionMonad => T): T = withTxRequired { f(this) }
def flatMap[T](f: TransactionMonad => T): T = withTxRequired { f(this) }
def foreach(f: TransactionMonad => Unit): Unit = withTxRequired { f(this) }
}
object RequiresNew extends TransactionMonad {
def map[T](f: TransactionMonad => T): T = withTxRequiresNew { f(this) }
def flatMap[T](f: TransactionMonad => T): T = withTxRequiresNew { f(this) }
def foreach(f: TransactionMonad => Unit): Unit = withTxRequiresNew { f(this) }
}
object Supports extends TransactionMonad {
def map[T](f: TransactionMonad => T): T = withTxSupports { f(this) }
def flatMap[T](f: TransactionMonad => T): T = withTxSupports { f(this) }
def foreach(f: TransactionMonad => Unit): Unit = withTxSupports { f(this) }
}
object Mandatory extends TransactionMonad {
def map[T](f: TransactionMonad => T): T = withTxMandatory { f(this) }
def flatMap[T](f: TransactionMonad => T): T = withTxMandatory { f(this) }
def foreach(f: TransactionMonad => Unit): Unit = withTxMandatory { f(this) }
}
object Never extends TransactionMonad {
def map[T](f: TransactionMonad => T): T = withTxNever { f(this) }
def flatMap[T](f: TransactionMonad => T): T = withTxNever { f(this) }
def foreach(f: TransactionMonad => Unit): Unit = withTxNever { f(this) }
}
object NoOpTransactionMonad extends TransactionMonad {
def map[T](f: TransactionMonad => T): T = f(this)
def flatMap[T](f: TransactionMonad => T): T = f(this)
def foreach(f: TransactionMonad => Unit): Unit = f(this)
override def filter(f: TransactionMonad => Boolean): TransactionMonad = this
}
private[jta] def setRollbackOnly = current.setRollbackOnly
private[jta] def isRollbackOnly = current.isRollbackOnly
private[jta] def getTransactionContainer: TransactionContainer = current.getTransactionContainer
private[this] def current = stack.value
/**
* Continues with the invocation defined in 'body' with the brand new context define in 'newCtx', the old
* one is put on the stack and will automatically come back in scope when the method exits.
* <p/>
* Suspends and resumes the current JTA transaction.
*/
private[jta] def withNewContext[T](body: => T): T = {
val suspendedTx: Option[Transaction] =
if (getTransactionContainer.isInExistingTransaction) {
log.debug("Suspending TX")
Some(getTransactionContainer.suspend)
} else None
val result = stack.withValue(new TransactionContext(tc)) { body }
if (suspendedTx.isDefined) {
log.debug("Resuming TX")
getTransactionContainer.resume(suspendedTx.get)
}
result
}
}
/**
* Base monad for the transaction monad implementations.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait TransactionMonad {
// -----------------------------
// Monadic definitions
// -----------------------------
def map[T](f: TransactionMonad => T): T
def flatMap[T](f: TransactionMonad => T): T
def foreach(f: TransactionMonad => Unit): Unit
def filter(f: TransactionMonad => Boolean): TransactionMonad =
if (f(this)) this else TransactionContext.NoOpTransactionMonad
// -----------------------------
// JTA Transaction definitions
// -----------------------------
/**
* Marks the current transaction as doomed.
*/
def setRollbackOnly = TransactionContext.setRollbackOnly
/**
* Marks the current transaction as doomed.
*/
def doom = TransactionContext.setRollbackOnly
/**
* Checks if the current transaction is doomed.
*/
def isRollbackOnly = TransactionContext.isRollbackOnly
/**
* Checks that the current transaction is NOT doomed.
*/
def isNotDoomed = !TransactionContext.isRollbackOnly
}
/**
* Transaction context, holds the EntityManager and the TransactionManager.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionContext(val tc: TransactionContainer) {
def registerSynchronization(sync: Synchronization) = TransactionContext.registerSynchronization(sync)
def setRollbackOnly = tc.setRollbackOnly
def isRollbackOnly: Boolean = tc.getStatus == Status.STATUS_MARKED_ROLLBACK
def getTransactionContainer: TransactionContainer = tc
}

View file

@ -0,0 +1,228 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.jta
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.stm.TransactionContainer
import java.util.{List => JList}
import java.util.concurrent.CopyOnWriteArrayList
import javax.naming.{NamingException, Context, InitialContext}
import javax.transaction.{
Transaction,
UserTransaction,
TransactionManager,
Status,
RollbackException,
SystemException,
Synchronization,
TransactionRequiredException
}
/**
* <p>
* Trait that implements a JTA transaction service that obeys the transaction semantics defined
* in the transaction attribute types for the transacted methods according to the EJB 3 draft specification.
* The aspect handles UserTransaction, TransactionManager instance variable injection thru @javax.ejb.Inject
* (name subject to change as per EJB 3 spec) and method transaction levels thru @javax.ejb.TransactionAttribute.
* </p>
*
* <p>
* This trait should be inherited to implement the getTransactionManager() method that should return a concrete
* javax.transaction.TransactionManager implementation (from JNDI lookup etc).
* </p>
* <p>
* <h3>Transaction attribute semantics</h3>
* (From http://www.kevinboone.com/ejb-transactions.html)
* </p>
* <p>
* <h4>Required</h4>
* 'Required' is probably the best choice (at least initially) for an EJB method that will need to be transactional. In this case, if the method's caller is already part of a transaction, then the EJB method does not create a new transaction, but continues in the same transaction as its caller. If the caller is not in a transaction, then a new transaction is created for the EJB method. If something happens in the EJB that means that a rollback is required, then the extent of the rollback will include everything done in the EJB method, whatever the condition of the caller. If the caller was in a transaction, then everything done by the caller will be rolled back as well. Thus the 'required' attribute ensures that any work done by the EJB will be rolled back if necessary, and if the caller requires a rollback that too will be rolled back.
* </p>
* <p>
* <h4>RequiresNew</h4>
* 'RequiresNew' will be appropriate if you want to ensure that the EJB method is rolled back if necessary, but you don't want the rollback to propogate back to the caller. This attribute results in the creation of a new transaction for the method, regardless of the transactional state of the caller. If the caller was operating in a transaction, then its transaction is suspended until the EJB method completes. Because a new transaction is always created, there may be a slight performance penalty if this attribute is over-used.
* </p>
* <p>
* <h4>Mandatory</h4>
* With the 'mandatory' attribute, the EJB method will not even start unless its caller is in a transaction. It will throw a <code>TransactionRequiredException</code> instead. If the method does start, then it will become part of the transaction of the caller. So if the EJB method signals a failure, the caller will be rolled back as well as the EJB.
* </p>
* <p>
* <h4>Supports</h4>
* With this attribute, the EJB method does not care about the transactional context of its caller. If the caller is part of a transaction, then the EJB method will be part of the same transaction. If the EJB method fails, the transaction will roll back. If the caller is not part of a transaction, then the EJB method will still operate, but a failure will not cause anything to roll back. 'Supports' is probably the attribute that leads to the fastest method call (as there is no transactional overhead), but it can lead to unpredicatable results. If you want a method to be isolated from transactions, that is, to have no effect on the transaction of its caller, then use 'NotSupported' instead.
* </p>
* <p>
* <h4>NotSupported</h4>
* With the 'NotSupported' attribute, the EJB method will never take part in a transaction. If the caller is part of a transaction, then the caller's transaction is suspended. If the EJB method fails, there will be no effect on the caller's transaction, and no rollback will occur. Use this method if you want to ensure that the EJB method will not cause a rollback in its caller. This is appropriate if, for example, the method does something non-essential, such as logging a message. It would not be helpful if the failure of this operation caused a transaction rollback.
* </p>
* <p>
* <h4>Never</h4>
* The 'NotSupported'' attribute will ensure that the EJB method is never called by a transactional caller. Any attempt to do so will result in a <code>RemoteException</code> being thrown. This attribute is probably less useful than `NotSupported', in that NotSupported will assure that the caller's transaction is never affected by the EJB method (just as `Never' does), but will allow a call from a transactional caller if necessary.
* </p>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait TransactionProtocol extends Logging {
protected val synchronization: JList[Synchronization] = new CopyOnWriteArrayList[Synchronization]
protected val joinTransactionFuns: JList[() => Unit] = new CopyOnWriteArrayList[() => Unit]
protected val exceptionsNotToRollbackOn: JList[Class[_ <: Exception]] = new CopyOnWriteArrayList[Class[_ <: Exception]]
def joinTransaction: Unit = {
val it = joinTransactionFuns.iterator
while (it.hasNext) {
val fn = it.next
fn()
}
}
def handleException(tm: TransactionContainer, e: Exception) = {
var rollback = true
val it = joinTransactionFuns.iterator
while (it.hasNext) {
val exception = it.next
if (e.getClass.isAssignableFrom(exception.getClass))
rollback = false
}
if (rollback) tm.setRollbackOnly
throw e
}
/**
* Wraps body in a transaction with REQUIRED semantics.
* <p/>
* Creates a new transaction if no transaction is active in scope, else joins the outer transaction.
*/
def withTxRequired[T](body: => T): T = {
val tm = TransactionContext.getTransactionContainer
if (!isInExistingTransaction(tm)) {
tm.begin
registerSynchronization
try {
joinTransaction
body
} catch {
case e: Exception => handleException(tm, e)
} finally {
commitOrRollBack(tm)
}
} else body
}
/**
* Wraps body in a transaction with REQUIRES_NEW semantics.
* <p/>
* Suspends existing transaction, starts a new transaction, invokes body,
* commits or rollbacks new transaction, finally resumes previous transaction.
*/
def withTxRequiresNew[T](body: => T): T = TransactionContext.withNewContext {
val tm = TransactionContext.getTransactionContainer
tm.begin
registerSynchronization
try {
joinTransaction
body
} catch {
case e: Exception => handleException(tm, e)
} finally {
commitOrRollBack(tm)
}
}
/**
* Wraps body in a transaction with NOT_SUPPORTED semantics.
* <p/>
* Suspends existing transaction, invokes body, resumes transaction.
*/
def withTxNotSupported[T](body: => T): T = TransactionContext.withNewContext {
body
}
/**
* Wraps body in a transaction with SUPPORTS semantics.
* <p/>
* Basicalla a No-op.
*/
def withTxSupports[T](body: => T): T = {
// attach to current if exists else skip -> do nothing
body
}
/**
* Wraps body in a transaction with MANDATORY semantics.
* <p/>
* Throws a TransactionRequiredException if there is no transaction active in scope.
*/
def withTxMandatory[T](body: => T): T = {
if (!isInExistingTransaction(TransactionContext.getTransactionContainer))
throw new TransactionRequiredException("No active TX at method with TX type set to MANDATORY")
body
}
/**
* Wraps body in a transaction with NEVER semantics.
* <p/>
* Throws a SystemException in case of an existing transaction in scope.
*/
def withTxNever[T](body: => T): T = {
if (isInExistingTransaction(TransactionContext.getTransactionContainer))
throw new SystemException("Detected active TX at method with TX type set to NEVER")
body
}
protected def commitOrRollBack(tm: TransactionContainer) = {
if (isInExistingTransaction(tm)) {
if (isRollbackOnly(tm)) {
log.debug("Rolling back TX marked as ROLLBACK_ONLY")
tm.rollback
} else {
log.debug("Committing TX")
tm.commit
}
}
}
// ---------------------------
// Helper methods
// ---------------------------
protected def registerSynchronization = {
val it = synchronization.iterator
while (it.hasNext) TransactionContext.getTransactionContainer.registerSynchronization(it.next)
}
/**
* Checks if a transaction is an existing transaction.
*
* @param tm the transaction manager
* @return boolean
*/
protected def isInExistingTransaction(tm: TransactionContainer): Boolean =
tm.getStatus != Status.STATUS_NO_TRANSACTION
/**
* Checks if current transaction is set to rollback only.
*
* @param tm the transaction manager
* @return boolean
*/
protected def isRollbackOnly(tm: TransactionContainer): Boolean =
tm.getStatus == Status.STATUS_MARKED_ROLLBACK
/**
* A ThreadLocal variable where to store suspended TX and enable pay as you go
* before advice - after advice data sharing in a specific case of requiresNew TX
*/
private val suspendedTx = new ThreadLocal[Transaction] {
override def initialValue = null
}
private def storeInThreadLocal(tx: Transaction) = suspendedTx.set(tx)
private def fetchFromThreadLocal: Option[Transaction] = {
if (suspendedTx != null && suspendedTx.get() != null) Some(suspendedTx.get.asInstanceOf[Transaction])
else None
}
}

View file

@ -1 +0,0 @@
java.naming.factory.initial=com.sun.enterprise.naming.SerialInitContextFactory

View file

@ -85,3 +85,21 @@ class SmallestMailboxFirstIterator(items : List[Actor]) extends InfiniteIterator
items.reduceLeft((actor1, actor2) => actorWithSmallestMailbox(actor1,actor2))
}
}
sealed trait ListenerMessage
case class Listen(listener : Actor) extends ListenerMessage
case class Deafen(listener : Actor) extends ListenerMessage
case class WithListeners(f : Set[Actor] => Unit) extends ListenerMessage
trait Listeners { self : Actor =>
import se.scalablesolutions.akka.actor.Agent
private lazy val listeners = Agent(Set[Actor]())
protected def listenerManagement : PartialFunction[Any,Unit] = {
case Listen(l) => listeners( _ + l)
case Deafen(l) => listeners( _ - l )
case WithListeners(f) => listeners foreach f
}
protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) )
}

View file

@ -90,6 +90,44 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
}
}
})
@Test def testListener = verify(new TestActor {
import java.util.concurrent.{ CountDownLatch, TimeUnit }
def test = {
val latch = new CountDownLatch(2)
val num = new AtomicInteger(0)
val i = new Actor with Listeners {
def receive = listenerManagement orElse {
case "foo" => gossip("bar")
}
}
i.start
def newListener = actor {
case "bar" =>
num.incrementAndGet
latch.countDown
}
val a1 = newListener
val a2 = newListener
val a3 = newListener
handle(i,a1,a2,a3) {
i ! Listen(a1)
i ! Listen(a2)
i ! Listen(a3)
i ! Deafen(a3)
i ! "foo"
val done = latch.await(5,TimeUnit.SECONDS)
done must be (true)
num.get must be (2)
}
}
});
}

View file

@ -192,7 +192,7 @@ class MongoStorageSpec extends TestCase {
assertTrue(l.map(_._1).contains("3"))
assertTrue(l.map(_._1).contains("4"))
val JsString(str) = l.filter(_._1 == "2").first._2
val JsString(str) = l.filter(_._1 == "2").head._2
assertEquals(str, "peter")
// trying to fetch for a non-existent transaction will throw

View file

@ -0,0 +1,42 @@
package se.scalablesolutions.akka.persistence.redis
import se.scalablesolutions.akka.actor.Actor
import com.redis._
sealed trait Msg
case class Subscribe(channels: Array[String]) extends Msg
case class Register(callback: PubSubMessage => Any) extends Msg
case class Unsubscribe(channels: Array[String]) extends Msg
case object UnsubscribeAll extends Msg
case class Publish(channel: String, msg: String) extends Msg
class Subscriber(client: RedisClient) extends Actor {
var callback: PubSubMessage => Any = { m => }
def receive = {
case Subscribe(channels) =>
client.subscribe(channels.head, channels.tail: _*)(callback)
reply(true)
case Register(cb) =>
callback = cb
reply(true)
case Unsubscribe(channels) =>
client.unsubscribe(channels.head, channels.tail: _*)
reply(true)
case UnsubscribeAll =>
client.unsubscribe
reply(true)
}
}
class Publisher(client: RedisClient) extends Actor {
def receive = {
case Publish(channel, message) =>
client.publish(channel, message)
reply(true)
}
}

View file

@ -17,10 +17,10 @@ Then to run the sample:
- Set 'export AKKA_HOME=<root of distribution>.
- Run 'sbt console' to start up a REPL (interpreter).
4. In the first REPL you get execute:
- scala> import se.scalablesolutions.akka.sample.chat._
- scala> import sample.chat._
- scala> ChatService.start
5. In the first REPL you get execute:
- scala> import se.scalablesolutions.akka.sample.chat._
5. In the second REPL you get execute:
- scala> import sample.chat._
- scala> Runner.run
6. See the chat simulation run.
7. Run it again to see full speed after first initialization.

View file

@ -33,10 +33,10 @@ Then to run the sample:
- Set 'export AKKA_HOME=<root of distribution>.
- Run 'sbt console' to start up a REPL (interpreter).
2. In the first REPL you get execute:
- scala> import se.scalablesolutions.akka.sample.chat._
- scala> import sample.chat._
- scala> ChatService.start
3. In the first REPL you get execute:
- scala> import se.scalablesolutions.akka.sample.chat._
3. In the second REPL you get execute:
- scala> import sample.chat._
- scala> Runner.run
4. See the chat simulation run.
5. Run it again to see full speed after first initialization.

View file

@ -0,0 +1,103 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>.
*/
package sample.pubsub
import com.redis.{RedisClient, PubSubMessage, S, U, M}
import se.scalablesolutions.akka.persistence.redis._
/**
* Sample Akka application for Redis PubSub
*
* Prerequisite: Need Redis Server running (the version that supports pubsub)
*
* 1. Download redis from http://github.com/antirez/redis
* 2. build using "make"
* 3. Run server as ./redis-server
*
* For running this sample application :-
*
* 1. Open a shell and set AKKA_HOME to the distribution root
* 2. cd $AKKA_HOME
* 3. sbt console
* 4. import sample.pubsub._
* 5. Sub.sub("a", "b") // starts Subscription server & subscribes to channels "a" and "b"
*
* 6. Open up another shell similarly as the above and set AKKA_HOME
* 7. cd $AKKA_HOME
* 8. sbt console
* 9. import sample.pubsub._
* 10. Pub.publish("a", "hello") // the first shell should get the message
* 11. Pub.publish("c", "hi") // the first shell should NOT get this message
*
* 12. Open up a redis-client from where you installed redis and issue a publish command
* ./redis-cli publish a "hi there" ## the first shell should get the message
*
* 13. Go back to the first shell
* 14. Sub.unsub("a") // should unsubscribe the first shell from channel "a"
*
* 15. Study the callback function defined below. It supports many other message formats.
* In the second shell window do the following:
* scala> Pub.publish("b", "+c") // will subscribe the first window to channel "c"
* scala> Pub.publish("b", "+d") // will subscribe the first window to channel "d"
* scala> Pub.publish("b", "-c") // will unsubscribe the first window from channel "c"
* scala> Pub.publish("b", "exit") // will unsubscribe the first window from all channels
*/
object Pub {
println("starting publishing service ..")
val r = new RedisClient("localhost", 6379)
val p = new Publisher(r)
p.start
def publish(channel: String, message: String) = {
p ! Publish(channel, message)
}
}
object Sub {
println("starting subscription service ..")
val r = new RedisClient("localhost", 6379)
val s = new Subscriber(r)
s.start
s ! Register(callback)
def sub(channels: String*) = {
s ! Subscribe(channels.toArray)
}
def unsub(channels: String*) = {
s ! Unsubscribe(channels.toArray)
}
def callback(pubsub: PubSubMessage) = pubsub match {
case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>
println("unsubscribe all ..")
r.unsubscribe
// message "+x" will subscribe to channel x
case x if x startsWith "+" =>
val s: Seq[Char] = x
s match {
case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
}
// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>
val s: Seq[Char] = x
s match {
case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
}
// other message receive
case x =>
println("received message on channel " + channel + " as : " + x)
}
}
}

View file

@ -6,7 +6,7 @@
<name>Akka Spring Tests in Java</name>
<artifactId>akka-spring-test-java</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.8</version>
<version>0.9</version>
<packaging>jar</packaging>
<properties>
@ -147,22 +147,22 @@
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-core_2.8.0.Beta1</artifactId>
<version>0.8.1</version>
<version>0.9</version>
</dependency>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-util_2.8.0.Beta1</artifactId>
<version>0.8.1</version>
<version>0.9</version>
</dependency>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-util-java_2.8.0.Beta1</artifactId>
<version>0.8.1</version>
<version>0.9</version>
</dependency>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-spring_2.8.0.Beta1</artifactId>
<version>0.8.1</version>
<version>0.9</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
@ -263,7 +263,17 @@
<artifactId>h2-lzf</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>jsr166x</groupId>
<artifactId>jsr166x</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jta_1.1_spec</artifactId>
<version>1.1.1</version>
</dependency>
<!-- test -->
<dependency>

View file

@ -1,10 +1,16 @@
package se.scalablesolutions.akka.spring.foo;
import java.io.IOException;
public class Bar implements IBar {
@Override
public String getBar() {
return "bar";
}
public void throwsIOException() throws IOException {
throw new IOException("some IO went wrong");
}
}

View file

@ -15,7 +15,7 @@ public class StatefulPojo {
@inittransactionalstate
public void init() {
if (!isInitialized) {
mapState = TransactionalState.newMap();
mapState = TransactionalState.newMap();
vectorState = TransactionalState.newVector();
refState = TransactionalState.newRef();
isInitialized = true;

View file

@ -35,101 +35,101 @@ import se.scalablesolutions.akka.spring.foo.StatefulPojo;
*/
public class SupervisorConfigurationTest {
private ApplicationContext context = null;
private ApplicationContext context = null;
@Before
public void setUp() {
context = new ClassPathXmlApplicationContext(
"se/scalablesolutions/akka/spring/foo/supervisor-config.xml");
}
@Before
public void setUp() {
context = new ClassPathXmlApplicationContext(
"se/scalablesolutions/akka/spring/foo/supervisor-config.xml");
}
@Test
public void testSupervision() {
// get ActiveObjectConfigurator bean from spring context
ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context
.getBean("supervision1");
// get ActiveObjects
Foo foo = myConfigurator.getInstance(Foo.class);
assertNotNull(foo);
IBar bar = myConfigurator.getInstance(IBar.class);
assertNotNull(bar);
MyPojo pojo = myConfigurator.getInstance(MyPojo.class);
assertNotNull(pojo);
}
@Test
public void testSupervision() {
// get ActiveObjectConfigurator bean from spring context
ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context
.getBean("supervision1");
// get ActiveObjects
Foo foo = myConfigurator.getInstance(Foo.class);
assertNotNull(foo);
IBar bar = myConfigurator.getInstance(IBar.class);
assertNotNull(bar);
MyPojo pojo = myConfigurator.getInstance(MyPojo.class);
assertNotNull(pojo);
}
@Test
public void testTransactionalState() {
ActiveObjectConfigurator conf = (ActiveObjectConfigurator) context
.getBean("supervision2");
StatefulPojo stateful = conf.getInstance(StatefulPojo.class);
stateful.setMapState("testTransactionalState", "some map state");
stateful.setVectorState("some vector state");
stateful.setRefState("some ref state");
assertEquals("some map state", stateful
.getMapState("testTransactionalState"));
assertEquals("some vector state", stateful.getVectorState());
assertEquals("some ref state", stateful.getRefState());
}
@Test
public void testTransactionalState() {
ActiveObjectConfigurator conf = (ActiveObjectConfigurator) context
.getBean("supervision2");
StatefulPojo stateful = conf.getInstance(StatefulPojo.class);
stateful.setMapState("testTransactionalState", "some map state");
stateful.setVectorState("some vector state");
stateful.setRefState("some ref state");
assertEquals("some map state", stateful
.getMapState("testTransactionalState"));
assertEquals("some vector state", stateful.getVectorState());
assertEquals("some ref state", stateful.getRefState());
}
@Test
public void testInitTransactionalState() {
StatefulPojo stateful = ActiveObject.newInstance(StatefulPojo.class,
1000, true);
assertTrue("should be inititalized", stateful.isInitialized());
}
@Test
public void testInitTransactionalState() {
StatefulPojo stateful = ActiveObject.newInstance(StatefulPojo.class,
1000, true);
assertTrue("should be inititalized", stateful.isInitialized());
}
@Test
public void testSupervisionWithDispatcher() {
ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context
.getBean("supervision-with-dispatcher");
// get ActiveObjects
Foo foo = myConfigurator.getInstance(Foo.class);
assertNotNull(foo);
// TODO how to check dispatcher?
}
@Test
public void testRemoteActiveObject() {
new Thread(new Runnable() {
public void run() {
RemoteNode.start();
}
}).start();
try {
Thread.currentThread().sleep(1000);
} catch (Exception e) {
}
Foo instance = ActiveObject.newRemoteInstance(Foo.class, 2000, "localhost", 9999);
System.out.println(instance.foo());
}
@Test
public void testSupervisionWithDispatcher() {
ActiveObjectConfigurator myConfigurator = (ActiveObjectConfigurator) context
.getBean("supervision-with-dispatcher");
// get ActiveObjects
Foo foo = myConfigurator.getInstance(Foo.class);
assertNotNull(foo);
// TODO how to check dispatcher?
}
@Test
public void testRemoteActiveObject() {
new Thread(new Runnable() {
public void run() {
RemoteNode.start();
}
}).start();
try {
Thread.currentThread().sleep(1000);
} catch (Exception e) {
}
Foo instance = ActiveObject.newRemoteInstance(Foo.class, 2000, "localhost", 9999);
System.out.println(instance.foo());
}
@Test
public void testSupervisedRemoteActiveObject() {
new Thread(new Runnable() {
public void run() {
RemoteNode.start();
}
}).start();
try {
Thread.currentThread().sleep(1000);
} catch (Exception e) {
}
@Test
public void testSupervisedRemoteActiveObject() {
new Thread(new Runnable() {
public void run() {
RemoteNode.start();
}
}).start();
try {
Thread.currentThread().sleep(1000);
} catch (Exception e) {
}
ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 10000, new Class[] { Exception.class }),
new Component[] {
new Component(
Foo.class,
new LifeCycle(new Permanent()),
10000,
new RemoteAddress("localhost", 9999))
}).supervise();
ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 10000, new Class[] { Exception.class }),
new Component[] {
new Component(
Foo.class,
new LifeCycle(new Permanent()),
10000,
new RemoteAddress("localhost", 9999))
}).supervise();
Foo instance = conf.getInstance(Foo.class);
assertEquals("foo", instance.foo());
}
Foo instance = conf.getInstance(Foo.class);
assertEquals("foo", instance.foo());
}
}

View file

@ -50,7 +50,7 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers {
val supervised = builder.getBeanDefinition.getPropertyValues.getPropertyValue("supervised").getValue.asInstanceOf[List[ActiveObjectProperties]]
assert(supervised != null)
expect(3) { supervised.length }
val iterator = supervised.elements
val iterator = supervised.iterator
expect("foo.bar.Foo") { iterator.next.target }
expect("foo.bar.Bar") { iterator.next.target }
expect("foo.bar.MyPojo") { iterator.next.target }

View file

@ -15,9 +15,9 @@
</log>
<akka>
version = "0.8.1"
version = "0.9"
# FQN to the class doing initial active object/actor
# FQN (Fully Qualified Name) to the class doing initial active object/actor
# supervisor bootstrap, should be defined in default constructor
boot = ["sample.camel.Boot",
"sample.rest.java.Boot",
@ -31,12 +31,20 @@
<stm>
service = on
fair = on # should transactions be fair or non-fair (non fair yield better performance)
max-nr-of-retries = 1000 # max nr of retries of a failing transaction before giving up
timeout = 10000 # transaction timeout; if transaction has not committed within the timeout then it is aborted
distributed = off # not implemented yet
fair = on # should transactions be fair or non-fair (non fair yield better performance)
max-nr-of-retries = 1000 # max nr of retries of a failing transaction before giving up
timeout = 10000 # transaction timeout; if transaction has not committed within the timeout then it is aborted
jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will
# begin (or join), commit or rollback the JTA transaction. Default is 'off'.
</stm>
<jta>
provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI)
# "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta',
# e.g. you need the akka-jta JARs on classpath).
timeout = 60000
</jta>
<rest>
service = on
hostname = "localhost"

View file

@ -1,6 +1,6 @@
project.organization=se.scalablesolutions.akka
project.name=akka
project.version=0.8.1
project.version=0.9
scala.version=2.8.0.Beta1
sbt.version=0.7.3
def.scala.version=2.7.7

View file

@ -23,7 +23,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val deployPath = info.projectPath / "deploy"
lazy val distPath = info.projectPath / "dist"
override def compileOptions = super.compileOptions ++
override def compileOptions = super.compileOptions ++
Seq("-deprecation", "-Xmigration", "-Xcheckinit", "-Xstrict-warnings", "-Xwarninit", "-encoding", "utf8").map(x => CompileOption(x))
override def javaCompileOptions = JavaCompileOption("-Xlint:unchecked") :: super.javaCompileOptions.toList
@ -42,9 +42,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val codehaus_snapshots = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org"
val jboss = "jBoss" at "http://repository.jboss.org/maven2"
val guiceyfruit = "GuiceyFruit" at "http://guiceyfruit.googlecode.com/svn/repo/releases/"
val google = "google" at "http://google-maven-repository.googlecode.com/svn/repository"
val m2 = "m2" at "http://download.java.net/maven/2"
val google = "Google" at "http://google-maven-repository.googlecode.com/svn/repository"
val java_net = "java.net" at "http://download.java.net/maven/2"
val scala_tools_snapshots = "scala-tools snapshots" at "http://scala-tools.org/repo-snapshots"
val scala_tools_releases = "scala-tools releases" at "http://scala-tools.org/repo-releases"
// ------------------------------------------------------------
// project defintions
@ -60,6 +61,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val akka_persistence = project("akka-persistence", "akka-persistence", new AkkaPersistenceParentProject(_))
lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterParentProject(_))
lazy val akka_spring = project("akka-spring", "akka-spring", new AkkaSpringProject(_), akka_core)
lazy val akka_jta = project("akka-jta", "akka-jta", new AkkaJTAProject(_), akka_core)
lazy val akka_servlet = project("akka-servlet", "akka-servlet", new AkkaServletProject(_),
akka_core, akka_rest, akka_camel)
lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_),
@ -108,7 +110,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
" dist/akka-persistence-cassandra_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-servlet_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-kernel_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-spring_%s-%s.jar".format(buildScalaVersion, version)
" dist/akka-spring_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-jta_%s-%s.jar".format(buildScalaVersion, version)
)
// ------------------------------------------------------------
@ -148,13 +151,14 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val netty = "org.jboss.netty" % "netty" % "3.2.0.BETA1" % "compile"
val commons_io = "commons-io" % "commons-io" % "1.4" % "compile"
val dispatch_json = "net.databinder" % "dispatch-json_2.8.0.Beta1" % "0.6.6" % "compile"
val dispatch_htdisttp = "net.databinder" % "dispatch-http_2.8.0.Beta1" % "0.6.6" % "compile"
val dispatch_http = "net.databinder" % "dispatch-http_2.8.0.Beta1" % "0.6.6" % "compile"
val sjson = "sjson.json" % "sjson" % "0.5-SNAPSHOT-2.8.Beta1" % "compile"
val sbinary = "sbinary" % "sbinary" % "2.8.0.Beta1-2.8.0.Beta1-0.3.1-SNAPSHOT" % "compile"
val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.2.1" % "compile"
val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % "1.2.1" % "compile"
val voldemort = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile"
val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile"
val jsr166x = "jsr166x" % "jsr166x" % "1.0" % "compile"
val jta_1_1 = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile"
// testing
val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
val junit = "junit" % "junit" % "4.5" % "test"
@ -236,13 +240,14 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
class AkkaCassandraProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val cassandra = "org.apache.cassandra" % "cassandra" % CASSANDRA_VERSION % "compile"
val slf4j = "org.slf4j" % "slf4j-api" % "1.5.8" % "compile"
val slf4j_log4j = "org.slf4j" % "slf4j-log4j12" % "1.5.8" % "compile"
val log4j = "log4j" % "log4j" % "1.2.15" % "compile"
// testing
val high_scale = "org.apache.cassandra" % "high-scale-lib" % CASSANDRA_VERSION % "test"
val cassandra_clhm = "org.apache.cassandra" % "clhm-production" % CASSANDRA_VERSION % "test"
val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test"
val google_coll = "com.google.collections" % "google-collections" % "1.0" % "test"
val slf4j = "org.slf4j" % "slf4j-api" % "1.5.8" % "test"
val slf4j_log4j = "org.slf4j" % "slf4j-log4j12" % "1.5.8" % "test"
val log4j = "log4j" % "log4j" % "1.2.15" % "test"
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}
@ -286,6 +291,14 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val junit = "junit" % "junit" % "4.5" % "test"
}
class AkkaJTAProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val atomikos_transactions = "com.atomikos" % "transactions" % "3.2.3" % "compile"
val atomikos_transactions_jta = "com.atomikos" % "transactions-jta" % "3.2.3" % "compile"
val atomikos_transactions_api = "com.atomikos" % "transactions-api" % "3.2.3" % "compile"
//val atomikos_transactions_util = "com.atomikos" % "transactions-util" % "3.2.3" % "compile"
val jta_spec = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile"
}
// examples
class AkkaFunTestProject(info: ProjectInfo) extends DefaultProject(info) {
val jackson_core_asl = "org.codehaus.jackson" % "jackson-core-asl" % "1.2.1" % "compile"
@ -301,6 +314,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaSampleChatProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
class AkkaSamplePubSubProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
class AkkaSampleLiftProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) {
val commons_logging = "commons-logging" % "commons-logging" % "1.1.1" % "compile"
@ -335,6 +349,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
class AkkaSamplesParentProject(info: ProjectInfo) extends ParentProject(info) {
lazy val akka_sample_chat = project("akka-sample-chat", "akka-sample-chat",
new AkkaSampleChatProject(_), akka_kernel)
lazy val akka_sample_pubsub = project("akka-sample-pubsub", "akka-sample-pubsub",
new AkkaSamplePubSubProject(_), akka_kernel)
lazy val akka_sample_lift = project("akka-sample-lift", "akka-sample-lift",
new AkkaSampleLiftProject(_), akka_kernel)
lazy val akka_sample_rest_java = project("akka-sample-rest-java", "akka-sample-rest-java",

View file

@ -0,0 +1,5 @@
import sbt._
class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
// val surefire = "bryanjswift" % "sbt-surefire-reporting" % "0.0.3-SNAPSHOT"
}