diff --git a/.gitignore b/.gitignore index 69dd6d55c9..45fe24daba 100755 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ *~ *# +project/plugins/project/ project/boot/* */project/build/target */project/boot @@ -32,4 +33,4 @@ tm.out .classpath .idea .scala_dependencies - +multiverse.log \ No newline at end of file diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala index 2b7d053457..e793794804 100644 --- a/akka-camel/src/main/scala/Producer.scala +++ b/akka-camel/src/main/scala/Producer.scala @@ -162,7 +162,7 @@ trait Producer { self: Actor => */ class ProducerResponseSender( headers: Map[String, Any], - replyTo : Option[Either[Actor,CompletableFuture]], + replyTo : Option[Either[Actor,CompletableFuture[Any]]], producer: Actor) extends Synchronization with Logging { implicit val producerActor = Some(producer) // the response sender diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index e3bd9ef943..f80dd2db42 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -4,6 +4,7 @@ package se.scalablesolutions.akka.actor +import se.scalablesolutions.akka.config.FaultHandlingStrategy import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future} @@ -28,6 +29,44 @@ object Annotations { val inittransactionalstate = classOf[inittransactionalstate] } +/** + * Configuration factory for Active Objects. + * + * FIXDOC: document ActiveObjectConfiguration + */ +final class ActiveObjectConfiguration { + private[akka] var _timeout: Long = Actor.TIMEOUT + private[akka] var _restartCallbacks: Option[RestartCallbacks] = None + private[akka] var _transactionRequired = false + private[akka] var _host: Option[InetSocketAddress] = None + private[akka] var _messageDispatcher: Option[MessageDispatcher] = None + + def timeout(timeout: Long) : ActiveObjectConfiguration = { + _timeout = timeout + this + } + + def restartCallbacks(pre: String, post: String) : ActiveObjectConfiguration = { + _restartCallbacks = Some(new RestartCallbacks(pre, post)) + this + } + + def makeTransactionRequired() : ActiveObjectConfiguration = { + _transactionRequired = true; + this + } + + def makeRemote(hostname: String, port: Int) : ActiveObjectConfiguration = { + _host = Some(new InetSocketAddress(hostname, port)) + this + } + + def dispatcher(messageDispatcher: MessageDispatcher) : ActiveObjectConfiguration = { + _messageDispatcher = Some(messageDispatcher) + this + } +} + /** * Factory class for creating Active Objects out of plain POJOs and/or POJOs with interfaces. * @@ -35,146 +74,196 @@ object Annotations { */ object ActiveObject { val AKKA_CAMEL_ROUTING_SCHEME = "akka" - private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern + private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern def newInstance[T](target: Class[T], timeout: Long): T = newInstance(target, new Dispatcher(false, None), None, timeout) + def newInstance[T](target: Class[T]): T = + newInstance(target, new Dispatcher(false, None), None, Actor.TIMEOUT) + + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = + newInstance(intf, target, new Dispatcher(false, None), None, timeout) + + def newInstance[T](intf: Class[T], target: AnyRef): T = + newInstance(intf, target, new Dispatcher(false, None), None, Actor.TIMEOUT) + + def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T = + newInstance(target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout) + + def newRemoteInstance[T](target: Class[T], hostname: String, port: Int): T = + newInstance(target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT) + + def newInstance[T](target: Class[T], config: ActiveObjectConfiguration): T = { + val actor = new Dispatcher(config._transactionRequired, config._restartCallbacks) + if (config._messageDispatcher.isDefined) { + actor.messageDispatcher = config._messageDispatcher.get + } + newInstance(target, actor, config._host, config._timeout) + } + + def newInstance[T](intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration): T = { + val actor = new Dispatcher(config._transactionRequired, config._restartCallbacks) + if (config._messageDispatcher.isDefined) { + actor.messageDispatcher = config._messageDispatcher.get + } + newInstance(intf, target, actor, config._host, config._timeout) + } + + @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") def newInstance[T](target: Class[T], timeout: Long, restartCallbacks: Option[RestartCallbacks]): T = newInstance(target, new Dispatcher(false, restartCallbacks), None, timeout) - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = - newInstance(intf, target, new Dispatcher(false, None), None, timeout) - + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, restartCallbacks: Option[RestartCallbacks]): T = newInstance(intf, target, new Dispatcher(false, restartCallbacks), None, timeout) + @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean): T = newInstance(target, new Dispatcher(transactionRequired, None), None, timeout) + @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, restartCallbacks: Option[RestartCallbacks]): T = newInstance(target, new Dispatcher(transactionRequired, restartCallbacks), None, timeout) + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean): T = newInstance(intf, target, new Dispatcher(transactionRequired, None), None, timeout) + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, restartCallbacks: Option[RestartCallbacks]): T = newInstance(intf, target, new Dispatcher(transactionRequired, restartCallbacks), None, timeout) - def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T = - newInstance(target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout) - - def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(target, new Dispatcher(false, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) - + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T = newInstance(intf, target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout) + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = newInstance(intf, target, new Dispatcher(false, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) + @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, hostname: String, port: Int): T = newInstance(target, new Dispatcher(transactionRequired, None), Some(new InetSocketAddress(hostname, port)), timeout) + @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = newInstance(target, new Dispatcher(transactionRequired, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, hostname: String, port: Int): T = newInstance(intf, target, new Dispatcher(transactionRequired, None), Some(new InetSocketAddress(hostname, port)), timeout) + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = newInstance(intf, target, new Dispatcher(transactionRequired, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) + @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = { val actor = new Dispatcher(false, None) actor.messageDispatcher = dispatcher newInstance(target, actor, None, timeout) } + @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(false, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(target, actor, None, timeout) } + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = { val actor = new Dispatcher(false, None) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, None, timeout) } + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(false, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, None, timeout) } + @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher): T = { val actor = new Dispatcher(transactionRequired, None) actor.messageDispatcher = dispatcher newInstance(target, actor, None, timeout) } + @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(transactionRequired, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(target, actor, None, timeout) } + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher): T = { val actor = new Dispatcher(transactionRequired, None) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, None, timeout) } + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(transactionRequired, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, None, timeout) } + @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { val actor = new Dispatcher(false, None) actor.messageDispatcher = dispatcher newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } + @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(false, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { val actor = new Dispatcher(false, None) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(false, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } + @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { val actor = new Dispatcher(transactionRequired, None) actor.messageDispatcher = dispatcher newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } + @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(transactionRequired, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { val actor = new Dispatcher(transactionRequired, None) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } + @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(transactionRequired, restartCallbacks) actor.messageDispatcher = dispatcher @@ -182,35 +271,94 @@ object ActiveObject { } private[akka] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { - val proxy = Proxy.newInstance(target, false, true) + val proxy = Proxy.newInstance(target, false, false) actor.initialize(target, proxy) actor.timeout = timeout + if (remoteAddress.isDefined) { + actor.makeRemote(remoteAddress.get) + } AspectInitRegistry.register(proxy, AspectInit(target, actor, remoteAddress, timeout)) actor.start proxy.asInstanceOf[T] } private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { - val proxy = Proxy.newInstance(Array(intf), Array(target), false, true) + val proxy = Proxy.newInstance(Array(intf), Array(target), false, false) actor.initialize(target.getClass, target) actor.timeout = timeout + if (remoteAddress.isDefined) { + actor.makeRemote(remoteAddress.get) + } AspectInitRegistry.register(proxy, AspectInit(intf, actor, remoteAddress, timeout)) actor.start proxy.asInstanceOf[T] } -// Jan Kronquist: started work on issue 121 -// def actorFor(obj: AnyRef): Option[Actor] = { -// ActorRegistry.actorsFor(classOf[Dispatcher]).find(a=>a.target == Some(obj)) -// } -// -// def link(supervisor: AnyRef, activeObject: AnyRef) = { -// actorFor(supervisor).get !! Link(actorFor(activeObject).get) -// } -// -// def unlink(supervisor: AnyRef, activeObject: AnyRef) = { -// actorFor(supervisor).get !! Unlink(actorFor(activeObject).get) -// } + /** + * Get the underlying dispatcher actor for the given active object. + */ + def actorFor(obj: AnyRef): Option[Actor] = { + ActorRegistry.actorsFor(classOf[Dispatcher]).find(a=>a.target == Some(obj)) + } + + /** + * Links an other active object to this active object. + * @param supervisor the supervisor active object + * @param supervised the active object to link + */ + def link(supervisor: AnyRef, supervised: AnyRef) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object")) + supervisorActor !! Link(supervisedActor) + } + + /** + * Links an other active object to this active object and sets the fault handling for the supervisor. + * @param supervisor the supervisor active object + * @param supervised the active object to link + * @param handler fault handling strategy + * @param trapExceptions array of exceptions that should be handled by the supervisor + */ + def link(supervisor: AnyRef, supervised: AnyRef, handler: FaultHandlingStrategy, trapExceptions: Array[Class[_ <: Throwable]]) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object")) + supervisorActor.trapExit = trapExceptions.toList + supervisorActor.faultHandler = Some(handler) + supervisorActor !! Link(supervisedActor) + } + + /** + * Unlink the supervised active object from the supervisor. + * @param supervisor the supervisor active object + * @param supervised the active object to unlink + */ + def unlink(supervisor: AnyRef, supervised: AnyRef) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't unlink when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't unlink when the supervised is not an active object")) + supervisorActor !! Unlink(supervisedActor) + } + + /** + * Sets the trap exit for the given supervisor active object. + * @param supervisor the supervisor active object + * @param trapExceptions array of exceptions that should be handled by the supervisor + */ + def trapExit(supervisor: AnyRef, trapExceptions: Array[Class[_ <: Throwable]]) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set trap exceptions when the supervisor is not an active object")) + supervisorActor.trapExit = trapExceptions.toList + this + } + + /** + * Sets the fault handling strategy for the given supervisor active object. + * @param supervisor the supervisor active object + * @param handler fault handling strategy + */ + def faultHandler(supervisor: AnyRef, handler: FaultHandlingStrategy) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set fault handler when the supervisor is not an active object")) + supervisorActor.faultHandler = Some(handler) + this + } private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = { val factory = SupervisorFactory(SupervisorConfig(restartStrategy, components)) @@ -227,19 +375,19 @@ private[akka] object AspectInitRegistry { val init = initializations.get(target) initializations.remove(target) init - } + } def register(target: AnyRef, init: AspectInit) = initializations.put(target, init) } private[akka] sealed case class AspectInit( val target: Class[_], - val actor: Dispatcher, + val actor: Dispatcher, val remoteAddress: Option[InetSocketAddress], val timeout: Long) { def this(target: Class[_],actor: Dispatcher, timeout: Long) = this(target, actor, None, timeout) } - + /** * AspectWerkz Aspect that is turning POJOs into Active Object. * Is deployed on a 'per-instance' basis. @@ -260,7 +408,7 @@ private[akka] sealed class ActiveObjectAspect { if (!isInitialized) { val init = AspectInitRegistry.initFor(joinPoint.getThis) target = init.target - actor = init.actor + actor = init.actor remoteAddress = init.remoteAddress timeout = init.timeout isInitialized = true @@ -279,7 +427,7 @@ private[akka] sealed class ActiveObjectAspect { (actor ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef] } else { - val result = actor !! Invocation(joinPoint, false, isVoid(rtti)) + val result = actor !! (Invocation(joinPoint, false, isVoid(rtti)), timeout) if (result.isDefined) result.get else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]") } @@ -314,12 +462,12 @@ private[akka] sealed class ActiveObjectAspect { } } - private def getResultOrThrowException[T](future: Future): Option[T] = + private def getResultOrThrowException[T](future: Future[T]): Option[T] = if (future.exception.isDefined) { val (_, cause) = future.exception.get throw cause - } else future.result.asInstanceOf[Option[T]] - + } else future.result + private def isOneWay(rtti: MethodRtti) = rtti.getMethod.isAnnotationPresent(Annotations.oneway) private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE @@ -366,11 +514,11 @@ private[akka] sealed class ActiveObjectAspect { } // Jan Kronquist: started work on issue 121 -// private[akka] case class Link(val actor: Actor) +private[akka] case class Link(val actor: Actor) object Dispatcher { val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]() - val ZERO_ITEM_OBJECT_ARRAY = Array[Object]() + val ZERO_ITEM_OBJECT_ARRAY = Array[Object]() } /** @@ -408,7 +556,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op "Could not find post restart method [" + post + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") }) } - // See if we have any annotation defined restart callbacks + // See if we have any annotation defined restart callbacks if (!preRestart.isDefined) preRestart = methods.find(m => m.isAnnotationPresent(Annotations.prerestart)) if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart)) @@ -421,7 +569,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op if (preRestart.isDefined) preRestart.get.setAccessible(true) if (postRestart.isDefined) postRestart.get.setAccessible(true) - + // see if we have a method annotated with @inittransactionalstate, if so invoke it initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate)) if (initTxState.isDefined && initTxState.get.getParameterTypes.length != 0) throw new IllegalStateException("Method annotated with @inittransactionalstate must have a zero argument definition") @@ -434,8 +582,8 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op if (isOneWay) joinPoint.proceed else reply(joinPoint.proceed) // Jan Kronquist: started work on issue 121 -// case Link(target) => -// link(target) + case Link(target) => link(target) + case Unlink(target) => unlink(target) case unexpected => throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]") } @@ -486,6 +634,6 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op if (!unserializable && hasMutableArgument) { val copyOfArgs = Serializer.Java.deepClone(args) joinPoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]]) - } + } } } diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index db38780c5a..0800f67065 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -52,6 +52,7 @@ case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMe case class Restart(reason: Throwable) extends LifeCycleMessage case class Exit(dead: Actor, killer: Throwable) extends LifeCycleMessage case class Unlink(child: Actor) extends LifeCycleMessage +case class UnlinkAndStop(child: Actor) extends LifeCycleMessage case object Kill extends LifeCycleMessage class ActorKilledException private[akka](message: String) extends RuntimeException(message) @@ -187,6 +188,43 @@ final class ActorRef private (val actor: Actor) { } else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") } +<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala +======= +} + +/** + * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': + * http://en.wikipedia.org/wiki/Actor_model + *
+ * An actor has a well-defined (non-cyclic) life-cycle. + *+ * => NEW (newly created actor) - can't receive messages (yet) + * => STARTED (when 'start' is invoked) - can receive messages + * => SHUT DOWN (when 'exit' is invoked) - can't do anything + *+ * + * @author Jonas Bonér + */ +trait Actor extends TransactionManagement with Logging { + implicit protected val self: Some[Actor] = Some(this) + // Only mutable for RemoteServer in order to maintain identity across nodes + private[akka] var _uuid = UUID.newUuid.toString + + // ==================================== + // private fields + // ==================================== + + @volatile private[this] var _isRunning = false + @volatile private[this] var _isSuspended = true + @volatile private[this] var _isShutDown = false + @volatile private[akka] var _isKilled = false + private var _hotswap: Option[PartialFunction[Any, Unit]] = None + private[akka] var _remoteAddress: Option[InetSocketAddress] = None + private[akka] var _linkedActors: Option[HashSet[Actor]] = None + private[akka] var _supervisor: Option[Actor] = None + private[akka] var _replyToAddress: Option[InetSocketAddress] = None + private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] +>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala /** * Sends a message asynchronously and waits on a future for a reply message. @@ -205,6 +243,7 @@ final class ActorRef private (val actor: Actor) { def !: Option[T] = ! /** +<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala * FIXME document !!! */ def !!!(message: Any): Future = { @@ -214,6 +253,18 @@ final class ActorRef private (val actor: Actor) { } else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") } +======= + * Holds the reference to the sender of the currently processed message. + * Is None if no sender was specified + * Is Some(Left(Actor)) if sender is an actor + * Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result + */ + protected var replyTo: Option[Either[Actor,CompletableFuture[Any]]] = None + + // ==================================== + // ==== USER CALLBACKS TO OVERRIDE ==== + // ==================================== +>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala /** * Forwards the message and passes the original sender actor as the sender. @@ -253,11 +304,16 @@ final class ActorRef private (val actor: Actor) { /** * Get the dispatcher for this actor. */ +<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala def dispatcher: MessageDispatcher = messageDispatcher +======= + protected[akka] var messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher +>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ +<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala def dispatcher_=(md: MessageDispatcher): Unit = synchronized { if (!_isRunning) { messageDispatcher.unregister(this) @@ -267,13 +323,20 @@ final class ActorRef private (val actor: Actor) { } else throw new IllegalArgumentException( "Can not swap dispatcher for " + toString + " after it has been started") } +======= + protected[akka] var trapExit: List[Class[_ <: Throwable]] = Nil +>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ +<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala def makeRemote(hostname: String, port: Int): Unit = if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") else makeRemote(new InetSocketAddress(hostname, port)) +======= + protected[akka] var faultHandler: Option[FaultHandlingStrategy] = None +>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. @@ -481,10 +544,32 @@ object Actor extends Logging { * } * */ +<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala def transactor(body: PartialFunction[Any, Unit]): Actor = new Transactor() { lifeCycle = Some(LifeCycle(Permanent)) start def receive: PartialFunction[Any, Unit] = body +======= + def !: Option[T] = { + if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") + if (_isRunning) { + val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) + val isActiveObject = message.isInstanceOf[Invocation] + if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) + try { + future.await + } catch { + case e: FutureTimeoutException => + if (isActiveObject) throw e + else None + } + + if (future.exception.isDefined) throw future.exception.get._2 + else future.result + } + else throw new IllegalStateException( + "Actor has not been started, you need to invoke 'actor.start' before using it") +>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala } /** @@ -551,6 +636,7 @@ object Actor extends Logging { * } * */ +<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala def spawn(body: => Unit): Unit = { case object Spawn new Actor() { @@ -560,6 +646,14 @@ object Actor extends Logging { case Spawn => body; stop } } +======= + def !!: Future[T] = { + if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") + if (_isRunning) { + postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) + } else throw new IllegalStateException( + "Actor has not been started, you need to invoke 'actor.start' before using it") +>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala } } @@ -581,6 +675,7 @@ trait Actor extends TransactionManagement with Logging { implicit protected val self: Option[Actor] = Some(this) // Only mutable for RemoteServer in order to maintain identity across nodes +<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala private[akka] var ref: Option[ActorRef] = None /** @@ -593,6 +688,57 @@ trait Actor extends TransactionManagement with Logging { // protected fields // ==================================== +======= + /** + * Forwards the message and passes the original sender actor as the sender. + * + * Works with both '!' and '!!'. + */ + def forward(message: Any)(implicit sender: Some[Actor]) = { + if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") + if (_isRunning) { + sender.get.replyTo match { + case Some(Left(actor)) => postMessageToMailbox(message, Some(actor)) + case Some(Right(future)) => postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, Some(future)) + case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor") + } + } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") + } + + /** + * Use
reply(..) to reply with a message to the original sender of the message currently
+ * being processed.
+ * Throws an IllegalStateException if unable to determine what to reply to
+ */
+ protected[this] def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException(
+ "\n\tNo sender in scope, can't reply. " +
+ "\n\tYou have probably used the '!' method to either; " +
+ "\n\t\t1. Send a message to a remote actor which does not have a contact address." +
+ "\n\t\t2. Send a message from an instance that is *not* an actor" +
+ "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
+ "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
+ "\n\tthat will be bound by the argument passed to 'reply'." +
+ "\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
+
+ /**
+ * Use reply_?(..) to reply with a message to the original sender of the message currently
+ * being processed.
+ * Returns true if reply was sent, and false if unable to determine what to reply to.
+ */
+ protected[this] def reply_?(message: Any) : Boolean = replyTo match {
+ case Some(Left(actor)) =>
+ actor ! message
+ true
+
+ case Some(Right(future : Future[Any])) =>
+ future completeWithResult message
+ true
+
+ case _ =>
+ false
+ }
+
+>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
/**
* TODO: Document replyTo
*/
@@ -613,7 +759,18 @@ trait Actor extends TransactionManagement with Logging {
* use a custom name to be able to retrieve the "correct" persisted state
* upon restart, remote restart etc.
*/
+<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
protected var id: String = this.getClass.getName
+=======
+ def dispatcher_=(md: MessageDispatcher): Unit = synchronized {
+ if (!_isRunning) {
+ messageDispatcher.unregister(this)
+ messageDispatcher = md
+ messageDispatcher.register(this)
+ } else throw new IllegalArgumentException(
+ "Can not swap dispatcher for " + toString + " after it has been started")
+ }
+>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
/**
* User overridable callback/setting.
@@ -735,7 +892,16 @@ trait Actor extends TransactionManagement with Logging {
* Mandatory callback method that is called during restart and reinitialization after a server crash.
* To be implemented by subclassing actor.
*/
+<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
protected def postRestart(reason: Throwable) {}
+=======
+ protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): T = {
+ val actor = spawnButDoNotStart[T]
+ actor.makeRemote(hostname, port)
+ actor.start
+ actor
+ }
+>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
/**
* User overridable callback/setting.
@@ -743,7 +909,19 @@ trait Actor extends TransactionManagement with Logging {
* Optional callback method that is called during termination.
* To be implemented by subclassing actor.
*/
+<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala
protected def initTransactionalState {}
+=======
+ protected[this] def spawnLink[T <: Actor: Manifest]: T = {
+ val actor = spawnButDoNotStart[T]
+ try {
+ actor.start
+ } finally {
+ link(actor)
+ }
+ actor
+ }
+>>>>>>> master:akka-core/src/main/scala/actor/Actor.scala
/**
* User overridable callback/setting.
@@ -816,10 +994,10 @@ trait Actor extends TransactionManagement with Logging {
RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.getId, sender.get)
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
- RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)
+ RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None)
} else {
val invocation = new MessageInvocation(this, message, sender.map(Left(_)), transactionSet.get)
- if (_isEventBased) {
+ if (messageDispatcher.usesActorMailbox) {
_mailbox.add(invocation)
if (_isSuspended) invocation.send
}
@@ -827,10 +1005,10 @@ trait Actor extends TransactionManagement with Logging {
}
}
- protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
+ protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
- senderFuture: Option[CompletableFuture]): CompletableFuture = {
+ senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
joinTransaction(message)
if (_remoteAddress.isDefined) {
@@ -850,12 +1028,13 @@ trait Actor extends TransactionManagement with Logging {
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else {
val future = if (senderFuture.isDefined) senderFuture.get
- else new DefaultCompletableFuture(timeout)
- val invocation = new MessageInvocation(this, message, Some(Right(future)), transactionSet.get)
- if (_isEventBased) {
+ else new DefaultCompletableFuture[T](timeout)
+ val invocation = new MessageInvocation(this, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get)
+
+ if (messageDispatcher.usesActorMailbox)
_mailbox.add(invocation)
- invocation.send
- } else invocation.send
+
+ invocation.send
future
}
}
@@ -961,18 +1140,15 @@ trait Actor extends TransactionManagement with Logging {
}
}
- private def getResultOrThrowException[T](future: Future): Option[T] =
- if (future.exception.isDefined) throw future.exception.get._2
- else future.result.asInstanceOf[Option[T]]
-
private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
private val lifeCycles: PartialFunction[Any, Unit] = {
- case HotSwap(code) => _hotswap = code
- case Restart(reason) => restart(reason)
- case Exit(dead, reason) => handleTrapExit(dead, reason)
- case Unlink(child) => unlink(child); child.stop
- case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
+ case HotSwap(code) => _hotswap = code
+ case Restart(reason) => restart(reason)
+ case Exit(dead, reason) => handleTrapExit(dead, reason)
+ case Unlink(child) => unlink(child)
+ case UnlinkAndStop(child) => unlink(child); child.stop
+ case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
@@ -1005,7 +1181,7 @@ trait Actor extends TransactionManagement with Logging {
// if last temporary actor is gone, then unlink me from supervisor
if (getLinkedActors.isEmpty) {
Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)\n\tshutting down and unlinking supervisor actor as well [%s].", actor.id)
- _supervisor.foreach(_ ! Unlink(this))
+ _supervisor.foreach(_ ! UnlinkAndStop(this))
}
}
}
diff --git a/akka-core/src/main/scala/config/Config.scala b/akka-core/src/main/scala/config/Config.scala
index 68cd75d825..f9a1035a7c 100644
--- a/akka-core/src/main/scala/config/Config.scala
+++ b/akka-core/src/main/scala/config/Config.scala
@@ -12,7 +12,7 @@ import net.lag.configgy.{Configgy, ParseException}
* @author Jonas Bonér
*/
object Config extends Logging {
- val VERSION = "0.8.1"
+ val VERSION = "0.9"
// Set Multiverse options for max speed
System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false")
diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala
index 49d9c624b6..2030d2026e 100644
--- a/akka-core/src/main/scala/dispatch/Dispatchers.scala
+++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala
@@ -11,7 +11,7 @@ import se.scalablesolutions.akka.actor.Actor
*
* Example usage:
*
- * val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
+ * val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name")
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
@@ -25,7 +25,7 @@ import se.scalablesolutions.akka.actor.Actor
*
* Example usage:
*
- * MessageDispatcher dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name");
+ * MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name");
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
@@ -40,9 +40,8 @@ import se.scalablesolutions.akka.actor.Actor
*/
object Dispatchers {
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
- override def register(actor : Actor) = {
- if (isShutdown)
- init
+ override def register(actor: Actor) = {
+ if (isShutdown) init
super.register(actor)
}
}
diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
index 705c3ee142..0c624c2e3a 100644
--- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
@@ -94,6 +94,8 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
active = false
references.clear
}
+
+ def usesActorMailbox = true
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
index a96f5c5e76..28fe624b86 100644
--- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
@@ -199,6 +199,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
pooledActors.remove(actor)
super.unregister(actor)
}
+
+ def usesActorMailbox = true
private def verifyActorsAreOfSameType(newActor: Actor) = {
actorType match {
diff --git a/akka-core/src/main/scala/dispatch/Future.scala b/akka-core/src/main/scala/dispatch/Future.scala
index 0bf9723e31..7fe0c4ab6a 100644
--- a/akka-core/src/main/scala/dispatch/Future.scala
+++ b/akka-core/src/main/scala/dispatch/Future.scala
@@ -20,8 +20,8 @@ object Futures {
* }
*
*/
- def future(timeout: Long)(body: => Any): Future = {
- val promise = new DefaultCompletableFuture(timeout)
+ def future[T](timeout: Long)(body: => T): Future[T] = {
+ val promise = new DefaultCompletableFuture[T](timeout)
try {
promise completeWithResult body
} catch {
@@ -30,10 +30,10 @@ object Futures {
promise
}
- def awaitAll(futures: List[Future]): Unit = futures.foreach(_.await)
+ def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
- def awaitOne(futures: List[Future]): Future = {
- var future: Option[Future] = None
+ def awaitOne(futures: List[Future[_]]): Future[_] = {
+ var future: Option[Future[_]] = None
do {
future = futures.find(_.isCompleted)
} while (future.isEmpty)
@@ -41,12 +41,12 @@ object Futures {
}
/*
- def awaitEither(f1: Future, f2: Future): Option[Any] = {
+ def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = {
import Actor.Sender.Self
import Actor.{spawn, actor}
- case class Result(res: Option[Any])
- val handOff = new SynchronousQueue[Option[Any]]
+ case class Result(res: Option[T])
+ val handOff = new SynchronousQueue[Option[T]]
spawn {
try {
println("f1 await")
@@ -70,23 +70,23 @@ object Futures {
*/
}
-sealed trait Future {
- def await
- def awaitBlocking
+sealed trait Future[T] {
+ def await : Future[T]
+ def awaitBlocking : Future[T]
def isCompleted: Boolean
def isExpired: Boolean
def timeoutInNanos: Long
- def result: Option[Any]
+ def result: Option[T]
def exception: Option[Tuple2[AnyRef, Throwable]]
}
-trait CompletableFuture extends Future {
- def completeWithResult(result: Any)
+trait CompletableFuture[T] extends Future[T] {
+ def completeWithResult(result: T)
def completeWithException(toBlame: AnyRef, exception: Throwable)
}
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
-class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
+class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
private val TIME_UNIT = TimeUnit.MILLISECONDS
def this() = this(0)
@@ -95,7 +95,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
private val _lock = new ReentrantLock
private val _signal = _lock.newCondition
private var _completed: Boolean = _
- private var _result: Option[Any] = None
+ private var _result: Option[T] = None
private var _exception: Option[Tuple2[AnyRef, Throwable]] = None
def await = try {
@@ -111,6 +111,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
wait = wait - (currentTimeInNanos - start)
}
}
+ this
} finally {
_lock.unlock
}
@@ -120,6 +121,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
while (!_completed) {
_signal.await
}
+ this
} finally {
_lock.unlock
}
@@ -138,7 +140,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
_lock.unlock
}
- def result: Option[Any] = try {
+ def result: Option[T] = try {
_lock.lock
_result
} finally {
@@ -152,7 +154,7 @@ class DefaultCompletableFuture(timeout: Long) extends CompletableFuture {
_lock.unlock
}
- def completeWithResult(result: Any) = try {
+ def completeWithResult(result: T) = try {
_lock.lock
if (!_completed) {
_completed = true
diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala
index f9db74190f..3eecbef0f3 100644
--- a/akka-core/src/main/scala/dispatch/Reactor.scala
+++ b/akka-core/src/main/scala/dispatch/Reactor.scala
@@ -15,7 +15,7 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier
final class MessageInvocation(val receiver: Actor,
val message: Any,
- val replyTo : Option[Either[Actor,CompletableFuture]],
+ val replyTo : Option[Either[Actor,CompletableFuture[Any]]],
val transactionSet: Option[CountDownCommitBarrier]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null")
@@ -68,6 +68,7 @@ trait MessageDispatcher extends Logging {
}
def canBeShutDown: Boolean = references.isEmpty
def isShutdown: Boolean
+ def usesActorMailbox : Boolean
}
trait MessageDemultiplexer {
diff --git a/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
index 15af513d62..fc99cf88d2 100644
--- a/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
@@ -37,6 +37,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(name: String) extends Abstra
}
def isShutdown = !active
+
+ def usesActorMailbox = false
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
diff --git a/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
index 941e701410..3f33d4ffc0 100644
--- a/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
@@ -134,6 +134,8 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
if (fair) true
else nrOfBusyMessages < 100
}
+
+ def usesActorMailbox = false
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
index 8b1463f655..fbfffc999e 100644
--- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
@@ -41,6 +41,8 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler:
def isShutdown = !active
+ def usesActorMailbox = false
+
def shutdown = if (active) {
log.debug("Shutting down ThreadBasedDispatcher [%s]", name)
active = false
diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala
index 81d5591fbb..2557c33d02 100644
--- a/akka-core/src/main/scala/remote/RemoteClient.scala
+++ b/akka-core/src/main/scala/remote/RemoteClient.scala
@@ -85,13 +85,13 @@ object RemoteClient extends Logging {
requestBuilder.setSourcePort(port)
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
- remoteClient.send(requestBuilder.build, None)
+ remoteClient.send[Any](requestBuilder.build, None)
}
- override def postMessageToMailboxAndCreateFutureResultWithTimeout(
+ override def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
- senderFuture: Option[CompletableFuture]): CompletableFuture = {
+ senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
@@ -173,7 +173,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port
@volatile private[remote] var isRunning = false
- private val futures = new ConcurrentHashMap[Long, CompletableFuture]
+ private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]]
private val supervisors = new ConcurrentHashMap[String, Actor]
private[remote] val listeners = new ConcurrentSkipListSet[Actor]
@@ -217,14 +217,14 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
}
}
- def send(request: RemoteRequest, senderFuture: Option[CompletableFuture]): Option[CompletableFuture] = if (isRunning) {
+ def send[T](request: RemoteRequest, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) {
if (request.getIsOneWay) {
connection.getChannel.write(request)
None
} else {
futures.synchronized {
val futureResult = if (senderFuture.isDefined) senderFuture.get
- else new DefaultCompletableFuture(request.getTimeout)
+ else new DefaultCompletableFuture[T](request.getTimeout)
futures.put(request.getId, futureResult)
connection.getChannel.write(request)
Some(futureResult)
@@ -253,7 +253,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
* @author Jonas Bonér
*/
class RemoteClientPipelineFactory(name: String,
- futures: ConcurrentMap[Long, CompletableFuture],
+ futures: ConcurrentMap[Long, CompletableFuture[_]],
supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap,
remoteAddress: SocketAddress,
@@ -284,7 +284,7 @@ class RemoteClientPipelineFactory(name: String,
*/
@ChannelHandler.Sharable
class RemoteClientHandler(val name: String,
- val futures: ConcurrentMap[Long, CompletableFuture],
+ val futures: ConcurrentMap[Long, CompletableFuture[_]],
val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress,
@@ -306,7 +306,7 @@ class RemoteClientHandler(val name: String,
if (result.isInstanceOf[RemoteReply]) {
val reply = result.asInstanceOf[RemoteReply]
log.debug("Remote client received RemoteReply[\n%s]", reply.toString)
- val future = futures.get(reply.getId)
+ val future = futures.get(reply.getId).asInstanceOf[CompletableFuture[Any]]
if (reply.getIsSuccessful) {
val message = RemoteProtocolBuilder.getMessage(reply)
future.completeWithResult(message)
diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala
index 7b2084aec6..332ae5c14e 100644
--- a/akka-core/src/main/scala/stm/DataFlowVariable.scala
+++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala
@@ -80,7 +80,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT
start
- private var readerFuture: Option[CompletableFuture] = None
+ private var readerFuture: Option[CompletableFuture[T]] = None
def receive = {
case Get =>
val ref = dataFlow.value.get
@@ -88,11 +88,11 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
reply(ref.get)
else {
readerFuture = replyTo match {
- case Some(Right(future)) => Some(future)
+ case Some(Right(future)) => Some(future.asInstanceOf[CompletableFuture[T]])
case _ => None
}
}
- case Set(v) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v)
+ case Set(v:T) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v)
case Exit => exit
}
}
diff --git a/akka-core/src/main/scala/stm/JTA.scala b/akka-core/src/main/scala/stm/JTA.scala
new file mode 100644
index 0000000000..fd2db0fc74
--- /dev/null
+++ b/akka-core/src/main/scala/stm/JTA.scala
@@ -0,0 +1,202 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ * User newUser = new AtomicTemplate[User]() {
+ * User atomic() {
+ * ... // create user atomically
+ * return user;
+ * }
+ * }.execute();
+ *
+ */
+trait AtomicTemplate[T] {
+ def atomic: T
+ def execute: T = Transaction.Local.atomic {
+ atomic
+ }
+}
object Transaction {
val idFactory = new AtomicLong(-1L)
@@ -253,9 +276,9 @@ object Transaction {
createNewTransactionSet
} else getTransactionSetInScope
val tx = new Transaction
+ tx.begin
tx.transaction = Some(mtx)
setTransaction(Some(tx))
-
txSet.registerOnCommitTask(new Runnable() {
def run = tx.commit
})
@@ -269,31 +292,47 @@ object Transaction {
}
/**
- * The Akka specific Transaction class, keeping track of persistent data structures (as in on-disc).
+ * The Akka specific Transaction class, keeping track of persistent data structures (as in on-disc)
+ * and JTA support.
*
* @author Jonas Bonér
*/
@serializable class Transaction extends Logging {
+ val JTA_AWARE = config.getBool("akka.stm.jta-aware", false)
+
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[akka] var transaction: Option[MultiverseTransaction] = None
private[this] val persistentStateMap = new HashMap[String, Committable]
private[akka] val depth = new AtomicInteger(0)
-
+
+ val jta: Option[TransactionContainer] =
+ if (JTA_AWARE) Some(TransactionContainer())
+ else None
+
log.trace("Creating %s", toString)
// --- public methods ---------
+ def begin = synchronized {
+ jta.foreach { txContainer =>
+ txContainer.begin
+ txContainer.registerSynchronization(new StmSynchronization(txContainer, this))
+ }
+ }
+
def commit = synchronized {
log.trace("Committing transaction %s", toString)
Transaction.atomic0 {
persistentStateMap.valuesIterator.foreach(_.commit)
}
status = TransactionStatus.Completed
+ jta.foreach(_.commit)
}
def abort = synchronized {
log.trace("Aborting transaction %s", toString)
+ jta.foreach(_.rollback)
}
def isNew = synchronized { status == TransactionStatus.New }
@@ -306,6 +345,8 @@ object Transaction {
// --- internal methods ---------
+ private def isJtaTxActive(status: Int) = status == Status.STATUS_ACTIVE
+
private[akka] def status_? = status
private[akka] def increment = depth.incrementAndGet
@@ -317,17 +358,17 @@ object Transaction {
private[akka] def register(uuid: String, storage: Committable) = persistentStateMap.put(uuid, storage)
private def ensureIsActive = if (status != TransactionStatus.Active)
- throw new IllegalStateException(
+ throw new StmConfigurationException(
"Expected ACTIVE transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrAborted =
if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
- throw new IllegalStateException(
+ throw new StmConfigurationException(
"Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrNew =
if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
- throw new IllegalStateException(
+ throw new StmConfigurationException(
"Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
// For reinitialize transaction after sending it over the wire
diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala
index 371d57ad88..d551f7fd76 100644
--- a/akka-core/src/main/scala/stm/TransactionManagement.scala
+++ b/akka-core/src/main/scala/stm/TransactionManagement.scala
@@ -40,13 +40,13 @@ object TransactionManagement extends TransactionManagement {
private[akka] def getTransactionSet: CountDownCommitBarrier = {
val option = transactionSet.get
- if ((option eq null) || option.isEmpty) throw new IllegalStateException("No Transaction set in scope")
+ if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction set in scope")
else option.get
}
private[akka] def getTransaction: Transaction = {
val option = transaction.get
- if ((option eq null) || option.isEmpty) throw new IllegalStateException("No Transaction in scope")
+ if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction in scope")
option.get
}
}
diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala
index e84beaa4f0..9bf4859ee5 100644
--- a/akka-core/src/main/scala/stm/TransactionalState.scala
+++ b/akka-core/src/main/scala/stm/TransactionalState.scala
@@ -62,6 +62,8 @@ trait Committable {
* @author Jonas Bonér
*/
object Ref {
+ type Ref[T] = TransactionalRef[T]
+
def apply[T]() = new Ref[T]
def apply[T](initialValue: T) = new Ref[T](Some(initialValue))
@@ -75,7 +77,7 @@ object Ref {
object TransactionalRef {
/**
- * An implicit conversion that converts an Option to an Iterable value.
+ * An implicit conversion that converts a TransactionalRef to an Iterable value.
*/
implicit def ref2Iterable[T](ref: TransactionalRef[T]): Iterable[T] = ref.toList
@@ -84,14 +86,6 @@ object TransactionalRef {
def apply[T](initialValue: T) = new TransactionalRef[T](Some(initialValue))
}
-/**
- * Implements a transactional managed reference.
- * Alias to TransactionalRef.
- *
- * @author Jonas Bonér
- */
-class Ref[T](initialOpt: Option[T] = None) extends TransactionalRef[T](initialOpt)
-
/**
* Implements a transactional managed reference.
* Alias to Ref.
@@ -99,6 +93,8 @@ class Ref[T](initialOpt: Option[T] = None) extends TransactionalRef[T](initialOp
* @author Jonas Bonér
*/
class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional {
+ self =>
+
import org.multiverse.api.ThreadLocalTransaction._
implicit val txInitName = "TransactionalRef:Init"
@@ -149,24 +145,36 @@ class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional {
ref.isNull
}
- def map[B](f: T => B): Option[B] = {
+ def map[B](f: T => B): TransactionalRef[B] = {
ensureIsInTransaction
- if (isEmpty) None else Some(f(ref.get))
+ if (isEmpty) TransactionalRef[B] else TransactionalRef(f(ref.get))
}
- def flatMap[B](f: T => Option[B]): Option[B] = {
+ def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = {
ensureIsInTransaction
- if (isEmpty) None else f(ref.get)
+ if (isEmpty) TransactionalRef[B] else f(ref.get)
}
- def filter(p: T => Boolean): Option[T] = {
+ def filter(p: T => Boolean): TransactionalRef[T] = {
ensureIsInTransaction
- if (isEmpty || p(ref.get)) Some(ref.get) else None
+ if (isDefined && p(ref.get)) TransactionalRef(ref.get) else TransactionalRef[T]
}
- def foreach(f: T => Unit) {
+ /**
+ * Necessary to keep from being implicitly converted to Iterable in for comprehensions.
+ */
+ def withFilter(p: T => Boolean): WithFilter = new WithFilter(p)
+
+ class WithFilter(p: T => Boolean) {
+ def map[B](f: T => B): TransactionalRef[B] = self filter p map f
+ def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = self filter p flatMap f
+ def foreach[U](f: T => U): Unit = self filter p foreach f
+ def withFilter(q: T => Boolean): WithFilter = new WithFilter(x => p(x) && q(x))
+ }
+
+ def foreach[U](f: T => U): Unit = {
ensureIsInTransaction
- if (!isEmpty) f(ref.get)
+ if (isDefined) f(ref.get)
}
def elements: Iterator[T] = {
diff --git a/akka-core/src/main/scala/stm/Vector.scala b/akka-core/src/main/scala/stm/Vector.scala
index b76281b909..a526906115 100644
--- a/akka-core/src/main/scala/stm/Vector.scala
+++ b/akka-core/src/main/scala/stm/Vector.scala
@@ -326,7 +326,7 @@ object Vector {
@inline
private[stm] def array(elems: AnyRef*) = {
val back = new Array[AnyRef](elems.length)
- Array.copy(elems, 0, back, 0, back.length)
+ Array.copy(elems.toArray, 0, back, 0, back.length)
back
}
diff --git a/akka-core/src/test/scala/PerformanceSpec.scala b/akka-core/src/test/scala/PerformanceSpec.scala
index 742a560f06..dd00d5ac3e 100644
--- a/akka-core/src/test/scala/PerformanceSpec.scala
+++ b/akka-core/src/test/scala/PerformanceSpec.scala
@@ -57,9 +57,9 @@ class PerformanceSpec extends JUnitSuite {
}
protected def sender : Option[Actor] = replyTo match {
- case Some(Left(actor)) => Some(actor)
- case _ => None
- }
+ case Some(Left(actor)) => Some(actor)
+ case _ => None
+ }
def receive = {
case MeetingCount(i) => {
@@ -104,9 +104,9 @@ class PerformanceSpec extends JUnitSuite {
}
protected def sender : Option[Actor] = replyTo match {
- case Some(Left(actor)) => Some(actor)
- case _ => None
- }
+ case Some(Left(actor)) => Some(actor)
+ case _ => None
+ }
override def receive: PartialFunction[Any, Unit] = {
case Meet(from, otherColour) =>
diff --git a/akka-core/src/test/scala/TransactionalRefSpec.scala b/akka-core/src/test/scala/TransactionalRefSpec.scala
new file mode 100644
index 0000000000..07c36ebdcf
--- /dev/null
+++ b/akka-core/src/test/scala/TransactionalRefSpec.scala
@@ -0,0 +1,137 @@
+package se.scalablesolutions.akka.stm
+
+import org.scalatest.Spec
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+@RunWith(classOf[JUnitRunner])
+class TransactionalRefSpec extends Spec with ShouldMatchers {
+
+ describe("A TransactionalRef") {
+ import Transaction.Local._
+
+ it("should optionally accept an initial value") {
+ val emptyRef = Ref[Int]
+ val empty = atomic { emptyRef.get }
+
+ empty should be(None)
+
+ val ref = Ref(3)
+ val value = atomic { ref.get.get }
+
+ value should be(3)
+ }
+
+ it("should be settable using swap") {
+ val ref = Ref[Int]
+
+ atomic { ref.swap(3) }
+
+ val value = atomic { ref.get.get }
+
+ value should be(3)
+ }
+
+ it("should be changeable using alter") {
+ val ref = Ref(0)
+
+ def increment = atomic {
+ ref alter (_ + 1)
+ }
+
+ increment
+ increment
+ increment
+
+ val value = atomic { ref.get.get }
+
+ value should be(3)
+ }
+
+ it("should not be changeable using alter if no value has been set") {
+ val ref = Ref[Int]
+
+ def increment = atomic {
+ ref alter (_ + 1)
+ }
+
+ evaluating { increment } should produce [RuntimeException]
+ }
+
+ it("should be able to be mapped") {
+ val ref1 = Ref(1)
+
+ val ref2 = atomic {
+ ref1 map (_ + 1)
+ }
+
+ val value1 = atomic { ref1.get.get }
+ val value2 = atomic { ref2.get.get }
+
+ value1 should be(1)
+ value2 should be(2)
+ }
+
+ it("should be able to be used in a 'foreach' for comprehension") {
+ val ref = Ref(3)
+
+ var result = 0
+
+ atomic {
+ for (value <- ref) {
+ result += value
+ }
+ }
+
+ result should be(3)
+ }
+
+ it("should be able to be used in a 'map' for comprehension") {
+ val ref1 = Ref(1)
+
+ val ref2 = atomic {
+ for (value <- ref1) yield value + 2
+ }
+
+ val value2 = atomic { ref2.get.get }
+
+ value2 should be(3)
+ }
+
+ it("should be able to be used in a 'flatMap' for comprehension") {
+ val ref1 = Ref(1)
+ val ref2 = Ref(2)
+
+ val ref3 = atomic {
+ for {
+ value1 <- ref1
+ value2 <- ref2
+ } yield value1 + value2
+ }
+
+ val value3 = atomic { ref3.get.get }
+
+ value3 should be(3)
+ }
+
+ it("should be able to be used in a 'filter' for comprehension") {
+ val ref1 = Ref(1)
+
+ val refLess2 = atomic {
+ for (value <- ref1 if value < 2) yield value
+ }
+
+ val optLess2 = atomic { refLess2.get }
+
+ val refGreater2 = atomic {
+ for (value <- ref1 if value > 2) yield value
+ }
+
+ val optGreater2 = atomic { refGreater2.get }
+
+ optLess2 should be(Some(1))
+ optGreater2 should be(None)
+ }
+ }
+}
diff --git a/akka-core/src/test/scala/VectorBugTestSuite.scala b/akka-core/src/test/scala/VectorBugTestSuite.scala
new file mode 100644
index 0000000000..658ace3681
--- /dev/null
+++ b/akka-core/src/test/scala/VectorBugTestSuite.scala
@@ -0,0 +1,17 @@
+package se.scalablesolutions.akka.stm
+
+import org.scalatest.FunSuite
+import Transaction.Global._
+
+class TransactionalVectorBugTestSuite extends FunSuite {
+
+ test("adding more than 32 items to a Vector shouldn't blow it up") {
+ atomic {
+ var v1 = new Vector[Int]()
+ for (i <- 0 to 31) {
+ v1 = v1 + i
+ }
+ v1 = v1 + 32
+ }
+ }
+}
diff --git a/akka-jta/src/main/scala/AtomikosTransactionService.scala b/akka-jta/src/main/scala/AtomikosTransactionService.scala
new file mode 100644
index 0000000000..937f31a54e
--- /dev/null
+++ b/akka-jta/src/main/scala/AtomikosTransactionService.scala
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ * import TransactionContext._
+ *
+ * withTxRequired {
+ * ... // transactional stuff
+ * }
+ * // or
+ * withTxRequiresNew {
+ * ... // transactional stuff
+ * }
+ *
+ * Example usage 2:
+ *
+ * for {
+ * ctx <- TransactionContext.Required
+ * entity <- updatedEntities
+ * if !ctx.isRollbackOnly
+ * } {
+ * // transactional stuff
+ * ...
+ * }
+ *
+ * Example usage 3:
+ *
+ * val users = for {
+ * ctx <- TransactionContext.Required
+ * name <- userNames
+ * } yield {
+ * // transactional stuff
+ * ...
+ * }
+ *
+ *
+ * @author Jonas Bonér
+ */
+object TransactionContext extends TransactionProtocol with Logging {
+ implicit val tc = TransactionContainer()
+
+ private[TransactionContext] val stack = new scala.util.DynamicVariable(new TransactionContext(tc))
+
+ /**
+ * This method can be used to register a Synchronization instance for participating with the JTA transaction.
+ * Here is an example of how to add a JPA EntityManager integration.
+ *
+ * TransactionContext.registerSynchronization(new javax.transaction.Synchronization() {
+ * def beforeCompletion = {
+ * try {
+ * val status = tm.getStatus
+ * if (status != Status.STATUS_ROLLEDBACK &&
+ * status != Status.STATUS_ROLLING_BACK &&
+ * status != Status.STATUS_MARKED_ROLLBACK) {
+ * log.debug("Flushing EntityManager...")
+ * em.flush // flush EntityManager on success
+ * }
+ * } catch {
+ * case e: javax.transaction.SystemException => throw new RuntimeException(e)
+ * }
+ * }
+ *
+ * def afterCompletion(status: Int) = {
+ * val status = tm.getStatus
+ * if (closeAtTxCompletion) em.close
+ * if (status == Status.STATUS_ROLLEDBACK ||
+ * status == Status.STATUS_ROLLING_BACK ||
+ * status == Status.STATUS_MARKED_ROLLBACK) {
+ * em.close
+ * }
+ * }
+ * })
+ *
+ * You should also override the 'joinTransaction' and 'handleException' methods.
+ * See ScalaDoc for these methods in the 'TransactionProtocol' for details.
+ */
+ def registerSynchronization(sync: Synchronization) = synchronization.add(sync)
+
+ /**
+ * Registeres a join transaction function.
+ *
+ * Here is an example on how to integrate with JPA EntityManager.
+ *
+ *
+ * TransactionContext.registerJoinTransactionFun(() => {
+ * val em: EntityManager = ... // get the EntityManager
+ * em.joinTransaction // join JTA transaction
+ * })
+ *
+ */
+ def registerJoinTransactionFun(fn: () => Unit) = joinTransactionFuns.add(fn)
+
+ /**
+ * Handle exception. Can be overriden by concrete transaction service implementation.
+ *
+ * Here is an example on how to handle JPA exceptions.
+ *
+ * + * TransactionContext.registerExceptionNotToRollbackOn(classOf[NoResultException]) + * TransactionContext.registerExceptionNotToRollbackOn(classOf[NonUniqueResultException]) + *+ */ + def registerExceptionNotToRollbackOn(e: Class[_ <: Exception]) = exceptionsNotToRollbackOn.add(e) + + object Required extends TransactionMonad { + def map[T](f: TransactionMonad => T): T = withTxRequired { f(this) } + def flatMap[T](f: TransactionMonad => T): T = withTxRequired { f(this) } + def foreach(f: TransactionMonad => Unit): Unit = withTxRequired { f(this) } + } + + object RequiresNew extends TransactionMonad { + def map[T](f: TransactionMonad => T): T = withTxRequiresNew { f(this) } + def flatMap[T](f: TransactionMonad => T): T = withTxRequiresNew { f(this) } + def foreach(f: TransactionMonad => Unit): Unit = withTxRequiresNew { f(this) } + } + + object Supports extends TransactionMonad { + def map[T](f: TransactionMonad => T): T = withTxSupports { f(this) } + def flatMap[T](f: TransactionMonad => T): T = withTxSupports { f(this) } + def foreach(f: TransactionMonad => Unit): Unit = withTxSupports { f(this) } + } + + object Mandatory extends TransactionMonad { + def map[T](f: TransactionMonad => T): T = withTxMandatory { f(this) } + def flatMap[T](f: TransactionMonad => T): T = withTxMandatory { f(this) } + def foreach(f: TransactionMonad => Unit): Unit = withTxMandatory { f(this) } + } + + object Never extends TransactionMonad { + def map[T](f: TransactionMonad => T): T = withTxNever { f(this) } + def flatMap[T](f: TransactionMonad => T): T = withTxNever { f(this) } + def foreach(f: TransactionMonad => Unit): Unit = withTxNever { f(this) } + } + + object NoOpTransactionMonad extends TransactionMonad { + def map[T](f: TransactionMonad => T): T = f(this) + def flatMap[T](f: TransactionMonad => T): T = f(this) + def foreach(f: TransactionMonad => Unit): Unit = f(this) + override def filter(f: TransactionMonad => Boolean): TransactionMonad = this + } + + private[jta] def setRollbackOnly = current.setRollbackOnly + + private[jta] def isRollbackOnly = current.isRollbackOnly + + private[jta] def getTransactionContainer: TransactionContainer = current.getTransactionContainer + + private[this] def current = stack.value + + /** + * Continues with the invocation defined in 'body' with the brand new context define in 'newCtx', the old + * one is put on the stack and will automatically come back in scope when the method exits. + * + * Suspends and resumes the current JTA transaction. + */ + private[jta] def withNewContext[T](body: => T): T = { + val suspendedTx: Option[Transaction] = + if (getTransactionContainer.isInExistingTransaction) { + log.debug("Suspending TX") + Some(getTransactionContainer.suspend) + } else None + val result = stack.withValue(new TransactionContext(tc)) { body } + if (suspendedTx.isDefined) { + log.debug("Resuming TX") + getTransactionContainer.resume(suspendedTx.get) + } + result + } +} + +/** + * Base monad for the transaction monad implementations. + * + * @author Jonas Bonér + */ +trait TransactionMonad { + + // ----------------------------- + // Monadic definitions + // ----------------------------- + + def map[T](f: TransactionMonad => T): T + def flatMap[T](f: TransactionMonad => T): T + def foreach(f: TransactionMonad => Unit): Unit + def filter(f: TransactionMonad => Boolean): TransactionMonad = + if (f(this)) this else TransactionContext.NoOpTransactionMonad + + // ----------------------------- + // JTA Transaction definitions + // ----------------------------- + + /** + * Marks the current transaction as doomed. + */ + def setRollbackOnly = TransactionContext.setRollbackOnly + + /** + * Marks the current transaction as doomed. + */ + def doom = TransactionContext.setRollbackOnly + + /** + * Checks if the current transaction is doomed. + */ + def isRollbackOnly = TransactionContext.isRollbackOnly + + /** + * Checks that the current transaction is NOT doomed. + */ + def isNotDoomed = !TransactionContext.isRollbackOnly +} + +/** + * Transaction context, holds the EntityManager and the TransactionManager. + * + * @author Jonas Bonér + */ +class TransactionContext(val tc: TransactionContainer) { + def registerSynchronization(sync: Synchronization) = TransactionContext.registerSynchronization(sync) + def setRollbackOnly = tc.setRollbackOnly + def isRollbackOnly: Boolean = tc.getStatus == Status.STATUS_MARKED_ROLLBACK + def getTransactionContainer: TransactionContainer = tc +} diff --git a/akka-jta/src/main/scala/TransactionProtocol.scala b/akka-jta/src/main/scala/TransactionProtocol.scala new file mode 100644 index 0000000000..c23ec26fd7 --- /dev/null +++ b/akka-jta/src/main/scala/TransactionProtocol.scala @@ -0,0 +1,228 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB
+ * Trait that implements a JTA transaction service that obeys the transaction semantics defined + * in the transaction attribute types for the transacted methods according to the EJB 3 draft specification. + * The aspect handles UserTransaction, TransactionManager instance variable injection thru @javax.ejb.Inject + * (name subject to change as per EJB 3 spec) and method transaction levels thru @javax.ejb.TransactionAttribute. + *
+ * + *+ * This trait should be inherited to implement the getTransactionManager() method that should return a concrete + * javax.transaction.TransactionManager implementation (from JNDI lookup etc). + *
+ *+ *
+ *
+ *
+ *
TransactionRequiredException instead. If the method does start, then it will become part of the transaction of the caller. So if the EJB method signals a failure, the caller will be rolled back as well as the EJB.
+ *
+ * + *
+ *
+ *
RemoteException being thrown. This attribute is probably less useful than `NotSupported', in that NotSupported will assure that the caller's transaction is never affected by the EJB method (just as `Never' does), but will allow a call from a transactional caller if necessary.
+ *
+ *
+ * @author Jonas Bonér
+ */
+trait TransactionProtocol extends Logging {
+
+ protected val synchronization: JList[Synchronization] = new CopyOnWriteArrayList[Synchronization]
+ protected val joinTransactionFuns: JList[() => Unit] = new CopyOnWriteArrayList[() => Unit]
+ protected val exceptionsNotToRollbackOn: JList[Class[_ <: Exception]] = new CopyOnWriteArrayList[Class[_ <: Exception]]
+
+ def joinTransaction: Unit = {
+ val it = joinTransactionFuns.iterator
+ while (it.hasNext) {
+ val fn = it.next
+ fn()
+ }
+ }
+
+ def handleException(tm: TransactionContainer, e: Exception) = {
+ var rollback = true
+ val it = joinTransactionFuns.iterator
+ while (it.hasNext) {
+ val exception = it.next
+ if (e.getClass.isAssignableFrom(exception.getClass))
+ rollback = false
+ }
+ if (rollback) tm.setRollbackOnly
+ throw e
+ }
+
+ /**
+ * Wraps body in a transaction with REQUIRED semantics.
+ *
+ * Creates a new transaction if no transaction is active in scope, else joins the outer transaction.
+ */
+ def withTxRequired[T](body: => T): T = {
+ val tm = TransactionContext.getTransactionContainer
+ if (!isInExistingTransaction(tm)) {
+ tm.begin
+ registerSynchronization
+ try {
+ joinTransaction
+ body
+ } catch {
+ case e: Exception => handleException(tm, e)
+ } finally {
+ commitOrRollBack(tm)
+ }
+ } else body
+ }
+
+ /**
+ * Wraps body in a transaction with REQUIRES_NEW semantics.
+ *
+ * Suspends existing transaction, starts a new transaction, invokes body,
+ * commits or rollbacks new transaction, finally resumes previous transaction.
+ */
+ def withTxRequiresNew[T](body: => T): T = TransactionContext.withNewContext {
+ val tm = TransactionContext.getTransactionContainer
+ tm.begin
+ registerSynchronization
+ try {
+ joinTransaction
+ body
+ } catch {
+ case e: Exception => handleException(tm, e)
+ } finally {
+ commitOrRollBack(tm)
+ }
+ }
+
+ /**
+ * Wraps body in a transaction with NOT_SUPPORTED semantics.
+ *
+ * Suspends existing transaction, invokes body, resumes transaction.
+ */
+ def withTxNotSupported[T](body: => T): T = TransactionContext.withNewContext {
+ body
+ }
+
+ /**
+ * Wraps body in a transaction with SUPPORTS semantics.
+ *
+ * Basicalla a No-op.
+ */
+ def withTxSupports[T](body: => T): T = {
+ // attach to current if exists else skip -> do nothing
+ body
+ }
+
+ /**
+ * Wraps body in a transaction with MANDATORY semantics.
+ *
+ * Throws a TransactionRequiredException if there is no transaction active in scope.
+ */
+ def withTxMandatory[T](body: => T): T = {
+ if (!isInExistingTransaction(TransactionContext.getTransactionContainer))
+ throw new TransactionRequiredException("No active TX at method with TX type set to MANDATORY")
+ body
+ }
+
+ /**
+ * Wraps body in a transaction with NEVER semantics.
+ *
+ * Throws a SystemException in case of an existing transaction in scope.
+ */
+ def withTxNever[T](body: => T): T = {
+ if (isInExistingTransaction(TransactionContext.getTransactionContainer))
+ throw new SystemException("Detected active TX at method with TX type set to NEVER")
+ body
+ }
+
+ protected def commitOrRollBack(tm: TransactionContainer) = {
+ if (isInExistingTransaction(tm)) {
+ if (isRollbackOnly(tm)) {
+ log.debug("Rolling back TX marked as ROLLBACK_ONLY")
+ tm.rollback
+ } else {
+ log.debug("Committing TX")
+ tm.commit
+ }
+ }
+ }
+
+ // ---------------------------
+ // Helper methods
+ // ---------------------------
+
+ protected def registerSynchronization = {
+ val it = synchronization.iterator
+ while (it.hasNext) TransactionContext.getTransactionContainer.registerSynchronization(it.next)
+ }
+ /**
+ * Checks if a transaction is an existing transaction.
+ *
+ * @param tm the transaction manager
+ * @return boolean
+ */
+ protected def isInExistingTransaction(tm: TransactionContainer): Boolean =
+ tm.getStatus != Status.STATUS_NO_TRANSACTION
+
+ /**
+ * Checks if current transaction is set to rollback only.
+ *
+ * @param tm the transaction manager
+ * @return boolean
+ */
+ protected def isRollbackOnly(tm: TransactionContainer): Boolean =
+ tm.getStatus == Status.STATUS_MARKED_ROLLBACK
+
+ /**
+ * A ThreadLocal variable where to store suspended TX and enable pay as you go
+ * before advice - after advice data sharing in a specific case of requiresNew TX
+ */
+ private val suspendedTx = new ThreadLocal[Transaction] {
+ override def initialValue = null
+ }
+
+ private def storeInThreadLocal(tx: Transaction) = suspendedTx.set(tx)
+
+ private def fetchFromThreadLocal: Option[Transaction] = {
+ if (suspendedTx != null && suspendedTx.get() != null) Some(suspendedTx.get.asInstanceOf[Transaction])
+ else None
+ }
+}
diff --git a/akka-kernel/src/main/resources/jndi.properties b/akka-kernel/src/main/resources/jndi.properties
deleted file mode 100644
index 3485823a1e..0000000000
--- a/akka-kernel/src/main/resources/jndi.properties
+++ /dev/null
@@ -1 +0,0 @@
-java.naming.factory.initial=com.sun.enterprise.naming.SerialInitContextFactory
\ No newline at end of file
diff --git a/akka-patterns/src/main/scala/Patterns.scala b/akka-patterns/src/main/scala/Patterns.scala
index fc50292676..d8a49c74e3 100644
--- a/akka-patterns/src/main/scala/Patterns.scala
+++ b/akka-patterns/src/main/scala/Patterns.scala
@@ -85,3 +85,21 @@ class SmallestMailboxFirstIterator(items : List[Actor]) extends InfiniteIterator
items.reduceLeft((actor1, actor2) => actorWithSmallestMailbox(actor1,actor2))
}
}
+
+sealed trait ListenerMessage
+case class Listen(listener : Actor) extends ListenerMessage
+case class Deafen(listener : Actor) extends ListenerMessage
+case class WithListeners(f : Set[Actor] => Unit) extends ListenerMessage
+
+trait Listeners { self : Actor =>
+ import se.scalablesolutions.akka.actor.Agent
+ private lazy val listeners = Agent(Set[Actor]())
+
+ protected def listenerManagement : PartialFunction[Any,Unit] = {
+ case Listen(l) => listeners( _ + l)
+ case Deafen(l) => listeners( _ - l )
+ case WithListeners(f) => listeners foreach f
+ }
+
+ protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) )
+}
\ No newline at end of file
diff --git a/akka-patterns/src/test/scala/ActorPatternsTest.scala b/akka-patterns/src/test/scala/ActorPatternsTest.scala
index 0ce999add0..2235b1a1a7 100644
--- a/akka-patterns/src/test/scala/ActorPatternsTest.scala
+++ b/akka-patterns/src/test/scala/ActorPatternsTest.scala
@@ -90,6 +90,44 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
}
}
})
+
+ @Test def testListener = verify(new TestActor {
+ import java.util.concurrent.{ CountDownLatch, TimeUnit }
+
+ def test = {
+ val latch = new CountDownLatch(2)
+ val num = new AtomicInteger(0)
+ val i = new Actor with Listeners {
+ def receive = listenerManagement orElse {
+ case "foo" => gossip("bar")
+ }
+ }
+ i.start
+
+ def newListener = actor {
+ case "bar" =>
+ num.incrementAndGet
+ latch.countDown
+ }
+
+ val a1 = newListener
+ val a2 = newListener
+ val a3 = newListener
+
+ handle(i,a1,a2,a3) {
+ i ! Listen(a1)
+ i ! Listen(a2)
+ i ! Listen(a3)
+ i ! Deafen(a3)
+
+ i ! "foo"
+
+ val done = latch.await(5,TimeUnit.SECONDS)
+ done must be (true)
+ num.get must be (2)
+ }
+ }
+ });
}
diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala
index 186157b576..97307dde17 100644
--- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala
+++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala
@@ -192,7 +192,7 @@ class MongoStorageSpec extends TestCase {
assertTrue(l.map(_._1).contains("3"))
assertTrue(l.map(_._1).contains("4"))
- val JsString(str) = l.filter(_._1 == "2").first._2
+ val JsString(str) = l.filter(_._1 == "2").head._2
assertEquals(str, "peter")
// trying to fetch for a non-existent transaction will throw
diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala
new file mode 100644
index 0000000000..c5621361fb
--- /dev/null
+++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala
@@ -0,0 +1,42 @@
+package se.scalablesolutions.akka.persistence.redis
+
+import se.scalablesolutions.akka.actor.Actor
+import com.redis._
+
+sealed trait Msg
+case class Subscribe(channels: Array[String]) extends Msg
+case class Register(callback: PubSubMessage => Any) extends Msg
+case class Unsubscribe(channels: Array[String]) extends Msg
+case object UnsubscribeAll extends Msg
+case class Publish(channel: String, msg: String) extends Msg
+
+class Subscriber(client: RedisClient) extends Actor {
+ var callback: PubSubMessage => Any = { m => }
+
+ def receive = {
+ case Subscribe(channels) =>
+ client.subscribe(channels.head, channels.tail: _*)(callback)
+ reply(true)
+
+ case Register(cb) =>
+ callback = cb
+ reply(true)
+
+ case Unsubscribe(channels) =>
+ client.unsubscribe(channels.head, channels.tail: _*)
+ reply(true)
+
+ case UnsubscribeAll =>
+ client.unsubscribe
+ reply(true)
+ }
+}
+
+class Publisher(client: RedisClient) extends Actor {
+ def receive = {
+ case Publish(channel, message) =>
+ client.publish(channel, message)
+ reply(true)
+ }
+}
+
diff --git a/akka-samples/akka-sample-chat/README b/akka-samples/akka-sample-chat/README
index 88720d8c55..66e54e3d44 100644
--- a/akka-samples/akka-sample-chat/README
+++ b/akka-samples/akka-sample-chat/README
@@ -17,10 +17,10 @@ Then to run the sample:
- Set 'export AKKA_HOME=