diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index a5a81fe40c..8cace19031 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -4,6 +4,7 @@ package se.scalablesolutions.akka.actor +import _root_.se.scalablesolutions.akka.config.FaultHandlingStrategy import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future} @@ -35,7 +36,7 @@ object Annotations { */ object ActiveObject { val AKKA_CAMEL_ROUTING_SCHEME = "akka" - private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern + private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern def newInstance[T](target: Class[T], timeout: Long): T = newInstance(target, new Dispatcher(false, None), None, timeout) @@ -199,6 +200,71 @@ object ActiveObject { proxy.asInstanceOf[T] } + /** + * Get the underlying dispatcher actor for the given active object. + */ + def actorFor(obj: AnyRef): Option[Actor] = { + ActorRegistry.actorsFor(classOf[Dispatcher]).find(a=>a.target == Some(obj)) + } + + /** + * Links an other active object to this active object. + * @param supervisor the supervisor active object + * @param supervised the active object to link + */ + def link(supervisor: AnyRef, supervised: AnyRef) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object")) + supervisorActor !! Link(supervisedActor) + } + + /** + * Links an other active object to this active object and sets the fault handling for the supervisor. + * @param supervisor the supervisor active object + * @param supervised the active object to link + * @param handler fault handling strategy + * @param trapExceptions array of exceptions that should be handled by the supervisor + */ + def link(supervisor: AnyRef, supervised: AnyRef, handler: FaultHandlingStrategy, trapExceptions: Array[Class[_ <: Throwable]]) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object")) + supervisorActor.trapExit = trapExceptions.toList + supervisorActor.faultHandler = Some(handler) + supervisorActor !! Link(supervisedActor) + } + + /** + * Unlink the supervised active object from the supervisor. + * @param supervisor the supervisor active object + * @param supervised the active object to unlink + */ + def unlink(supervisor: AnyRef, supervised: AnyRef) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't unlink when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't unlink when the supervised is not an active object")) + supervisorActor !! Unlink(supervisedActor) + } + + /** + * Sets the trap exit for the given supervisor active object. + * @param supervisor the supervisor active object + * @param trapExceptions array of exceptions that should be handled by the supervisor + */ + def trapExit(supervisor: AnyRef, trapExceptions: Array[Class[_ <: Throwable]]) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set trap exceptions when the supervisor is not an active object")) + supervisorActor.trapExit = trapExceptions.toList + this + } + + /** + * Sets the fault handling strategy for the given supervisor active object. + * @param supervisor the supervisor active object + * @param handler fault handling strategy + */ + def faultHandler(supervisor: AnyRef, handler: FaultHandlingStrategy) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set fault handler when the supervisor is not an active object")) + supervisorActor.faultHandler = Some(handler) + this + } private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = { val factory = SupervisorFactory(SupervisorConfig(restartStrategy, components)) @@ -215,19 +281,19 @@ private[akka] object AspectInitRegistry { val init = initializations.get(target) initializations.remove(target) init - } + } def register(target: AnyRef, init: AspectInit) = initializations.put(target, init) } private[akka] sealed case class AspectInit( val target: Class[_], - val actor: Dispatcher, + val actor: Dispatcher, val remoteAddress: Option[InetSocketAddress], val timeout: Long) { def this(target: Class[_],actor: Dispatcher, timeout: Long) = this(target, actor, None, timeout) } - + /** * AspectWerkz Aspect that is turning POJOs into Active Object. * Is deployed on a 'per-instance' basis. @@ -248,7 +314,7 @@ private[akka] sealed class ActiveObjectAspect { if (!isInitialized) { val init = AspectInitRegistry.initFor(joinPoint.getThis) target = init.target - actor = init.actor + actor = init.actor remoteAddress = init.remoteAddress timeout = init.timeout isInitialized = true @@ -267,7 +333,7 @@ private[akka] sealed class ActiveObjectAspect { (actor ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef] } else { - val result = actor !! Invocation(joinPoint, false, isVoid(rtti)) + val result = actor !! (Invocation(joinPoint, false, isVoid(rtti)), timeout) if (result.isDefined) result.get else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]") } @@ -307,7 +373,7 @@ private[akka] sealed class ActiveObjectAspect { val (_, cause) = future.exception.get throw cause } else future.result.asInstanceOf[Option[T]] - + private def isOneWay(rtti: MethodRtti) = rtti.getMethod.isAnnotationPresent(Annotations.oneway) private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE @@ -353,9 +419,12 @@ private[akka] sealed class ActiveObjectAspect { } } +// Jan Kronquist: started work on issue 121 +private[akka] case class Link(val actor: Actor) + object Dispatcher { val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]() - val ZERO_ITEM_OBJECT_ARRAY = Array[Object]() + val ZERO_ITEM_OBJECT_ARRAY = Array[Object]() } /** @@ -393,7 +462,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op "Could not find post restart method [" + post + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") }) } - // See if we have any annotation defined restart callbacks + // See if we have any annotation defined restart callbacks if (!preRestart.isDefined) preRestart = methods.find(m => m.isAnnotationPresent(Annotations.prerestart)) if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart)) @@ -406,7 +475,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op if (preRestart.isDefined) preRestart.get.setAccessible(true) if (postRestart.isDefined) postRestart.get.setAccessible(true) - + // see if we have a method annotated with @inittransactionalstate, if so invoke it initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate)) if (initTxState.isDefined && initTxState.get.getParameterTypes.length != 0) throw new IllegalStateException("Method annotated with @inittransactionalstate must have a zero argument definition") @@ -418,6 +487,9 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) if (isOneWay) joinPoint.proceed else reply(joinPoint.proceed) +// Jan Kronquist: started work on issue 121 + case Link(target) => link(target) + case Unlink(target) => unlink(target) case unexpected => throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]") } @@ -468,6 +540,6 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op if (!unserializable && hasMutableArgument) { val copyOfArgs = Serializer.Java.deepClone(args) joinPoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]]) - } + } } } diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 113fab5f32..37a297d5ca 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -52,6 +52,7 @@ case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMe case class Restart(reason: Throwable) extends LifeCycleMessage case class Exit(dead: Actor, killer: Throwable) extends LifeCycleMessage case class Unlink(child: Actor) extends LifeCycleMessage +case class UnlinkAndStop(child: Actor) extends LifeCycleMessage case object Kill extends LifeCycleMessage class ActorKilledException private[akka](message: String) extends RuntimeException(message) @@ -151,29 +152,6 @@ object Actor extends Logging { def receive = body } - /** - * Use to create an anonymous event-driven remote actor. - *
- * The actor is created with a 'permanent' life-cycle configuration, which means that - * if the actor is supervised and dies it will be restarted. - * - * The actor is started when created. - * Example: - *
- * import Actor._
- *
- * val a = remoteActor("localhost", 9999) {
- * case msg => ... // handle message
- * }
- *
- */
- def remoteActor(hostname: String, port: Int)(body: PartialFunction[Any, Unit]): Actor = new Actor() {
- lifeCycle = Some(LifeCycle(Permanent))
- makeRemote(hostname, port)
- start
- def receive = body
- }
-
/**
* Use to create an anonymous event-driven actor with both an init block and a message loop block.
*
@@ -244,7 +222,7 @@ object Actor extends Logging {
* @author Jonas Bonér
*/
trait Actor extends TransactionManagement with Logging {
- implicit protected val self: Option[Actor] = Some(this)
+ implicit protected val self: Some[Actor] = Some(this)
// Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = UUID.newUuid.toString
@@ -255,7 +233,6 @@ trait Actor extends TransactionManagement with Logging {
@volatile private[this] var _isRunning = false
@volatile private[this] var _isSuspended = true
@volatile private[this] var _isShutDown = false
- @volatile private[this] var _isEventBased: Boolean = false
@volatile private[akka] var _isKilled = false
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
@@ -275,7 +252,10 @@ trait Actor extends TransactionManagement with Logging {
// ====================================
/**
- * TODO: Document replyTo
+ * Holds the reference to the sender of the currently processed message.
+ * Is None if no sender was specified
+ * Is Some(Left(Actor)) if sender is an actor
+ * Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result
*/
protected var replyTo: Option[Either[Actor,CompletableFuture]] = None
@@ -317,11 +297,7 @@ trait Actor extends TransactionManagement with Logging {
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
*/
- protected[akka] var messageDispatcher: MessageDispatcher = {
- val dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
- _isEventBased = dispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher]
- dispatcher
- }
+ protected[akka] var messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
/**
* User overridable callback/setting.
@@ -341,7 +317,7 @@ trait Actor extends TransactionManagement with Logging {
* trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
*
*/
- protected var trapExit: List[Class[_ <: Throwable]] = Nil
+ protected[akka] var trapExit: List[Class[_ <: Throwable]] = Nil
/**
* User overridable callback/setting.
@@ -354,7 +330,7 @@ trait Actor extends TransactionManagement with Logging {
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
*
*/
- protected var faultHandler: Option[FaultHandlingStrategy] = None
+ protected[akka] var faultHandler: Option[FaultHandlingStrategy] = None
/**
* User overridable callback/setting.
@@ -536,13 +512,17 @@ trait Actor extends TransactionManagement with Logging {
if (isActiveObject) throw e
else None
}
- getResultOrThrowException(future)
- } else throw new IllegalStateException(
+
+ if (future.exception.isDefined) throw future.exception.get._2
+ else future.result.asInstanceOf[Option[T]]
+ }
+ else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
* Sends a message asynchronously and waits on a future for a reply message.
+ * Uses the time-out defined in the Actor.
*
* It waits on the reply either until it receives it (in the form of Some(replyMessage))
* or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
@@ -572,11 +552,10 @@ trait Actor extends TransactionManagement with Logging {
*
* Works with both '!' and '!!'.
*/
- def forward(message: Any)(implicit sender: Option[Actor] = None) = {
+ def forward(message: Any)(implicit sender: Some[Actor]) = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) {
- val forwarder = sender.getOrElse(throw new IllegalStateException("Can't forward message when the forwarder/mediator is not an actor"))
- forwarder.replyTo match {
+ sender.get.replyTo match {
case Some(Left(actor)) => postMessageToMailbox(message, Some(actor))
case Some(Right(future)) => postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, Some(future))
case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor")
@@ -615,7 +594,6 @@ trait Actor extends TransactionManagement with Logging {
messageDispatcher.unregister(this)
messageDispatcher = md
messageDispatcher.register(this)
- _isEventBased = messageDispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher]
} else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
@@ -734,7 +712,7 @@ trait Actor extends TransactionManagement with Logging {
*
* To be invoked from within the actor itself.
*/
- protected[this] def spawnRemote[T <: Actor : Manifest](hostname: String, port: Int): T = {
+ protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): T = {
val actor = spawnButDoNotStart[T]
actor.makeRemote(hostname, port)
actor.start
@@ -746,7 +724,7 @@ trait Actor extends TransactionManagement with Logging {
*
* To be invoked from within the actor itself.
*/
- protected[this] def spawnLink[T <: Actor : Manifest] : T = {
+ protected[this] def spawnLink[T <: Actor: Manifest]: T = {
val actor = spawnButDoNotStart[T]
try {
actor.start
@@ -838,7 +816,7 @@ trait Actor extends TransactionManagement with Logging {
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)
} else {
val invocation = new MessageInvocation(this, message, sender.map(Left(_)), transactionSet.get)
- if (_isEventBased) {
+ if (messageDispatcher.usesActorMailbox) {
_mailbox.add(invocation)
if (_isSuspended) invocation.send
}
@@ -871,10 +849,11 @@ trait Actor extends TransactionManagement with Logging {
val future = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture(timeout)
val invocation = new MessageInvocation(this, message, Some(Right(future)), transactionSet.get)
- if (_isEventBased) {
+
+ if (messageDispatcher.usesActorMailbox)
_mailbox.add(invocation)
- invocation.send
- } else invocation.send
+
+ invocation.send
future
}
}
@@ -980,18 +959,15 @@ trait Actor extends TransactionManagement with Logging {
}
}
- private def getResultOrThrowException[T](future: Future): Option[T] =
- if (future.exception.isDefined) throw future.exception.get._2
- else future.result.asInstanceOf[Option[T]]
-
private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
private val lifeCycles: PartialFunction[Any, Unit] = {
- case HotSwap(code) => _hotswap = code
- case Restart(reason) => restart(reason)
- case Exit(dead, reason) => handleTrapExit(dead, reason)
- case Unlink(child) => unlink(child); child.stop
- case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
+ case HotSwap(code) => _hotswap = code
+ case Restart(reason) => restart(reason)
+ case Exit(dead, reason) => handleTrapExit(dead, reason)
+ case Unlink(child) => unlink(child)
+ case UnlinkAndStop(child) => unlink(child); child.stop
+ case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
@@ -1024,7 +1000,7 @@ trait Actor extends TransactionManagement with Logging {
// if last temporary actor is gone, then unlink me from supervisor
if (getLinkedActors.isEmpty) {
Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)\n\tshutting down and unlinking supervisor actor as well [%s].", actor.id)
- _supervisor.foreach(_ ! Unlink(this))
+ _supervisor.foreach(_ ! UnlinkAndStop(this))
}
}
}
diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala
index a0ca0c90eb..b6a65423a3 100644
--- a/akka-core/src/main/scala/actor/Agent.scala
+++ b/akka-core/src/main/scala/actor/Agent.scala
@@ -90,7 +90,7 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
*
* IMPORTANT:
* You can *not* call 'agent.get', 'agent()' or use the monadic 'foreach',
-* 'map and 'flatMap' within an enclosing transaction since that would block
+* 'map' and 'flatMap' within an enclosing transaction since that would block
* the transaction indefinitely. But all other operations are fine. The system
* will raise an error (e.g. *not* deadlock) if you try to do so, so as long as
* you test your application thoroughly you should be fine.
@@ -99,11 +99,13 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
* @author Jonas Bonér
*/
sealed class Agent[T] private (initialValue: T) extends Transactor {
+ start
import Agent._
+ log.debug("Starting up Agent [%s]", _uuid)
+
private lazy val value = Ref[T]()
- start
- this !! Value(initialValue)
+ this ! Value(initialValue)
/**
* Periodically handles incoming messages.
diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala
index 49d9c624b6..2030d2026e 100644
--- a/akka-core/src/main/scala/dispatch/Dispatchers.scala
+++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala
@@ -11,7 +11,7 @@ import se.scalablesolutions.akka.actor.Actor
*
* Example usage:
*
- * val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
+ * val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name")
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
@@ -25,7 +25,7 @@ import se.scalablesolutions.akka.actor.Actor
*
* Example usage:
*
- * MessageDispatcher dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name");
+ * MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name");
* dispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
@@ -40,9 +40,8 @@ import se.scalablesolutions.akka.actor.Actor
*/
object Dispatchers {
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
- override def register(actor : Actor) = {
- if (isShutdown)
- init
+ override def register(actor: Actor) = {
+ if (isShutdown) init
super.register(actor)
}
}
diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
index 705c3ee142..0c624c2e3a 100644
--- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
@@ -94,6 +94,8 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
active = false
references.clear
}
+
+ def usesActorMailbox = true
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
index a769f92e55..28fe624b86 100644
--- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
@@ -168,7 +168,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
private def donateMessage(receiver: Actor, thief: Actor): Option[MessageInvocation] = {
val donated = receiver._mailbox.pollLast
if (donated != null) {
- thief.forward(donated.message)(Some(donated.receiver))
+ thief ! donated.message
return Some(donated)
} else return None
}
@@ -199,6 +199,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
pooledActors.remove(actor)
super.unregister(actor)
}
+
+ def usesActorMailbox = true
private def verifyActorsAreOfSameType(newActor: Actor) = {
actorType match {
diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala
index f9db74190f..3f300b1c52 100644
--- a/akka-core/src/main/scala/dispatch/Reactor.scala
+++ b/akka-core/src/main/scala/dispatch/Reactor.scala
@@ -68,6 +68,7 @@ trait MessageDispatcher extends Logging {
}
def canBeShutDown: Boolean = references.isEmpty
def isShutdown: Boolean
+ def usesActorMailbox : Boolean
}
trait MessageDemultiplexer {
diff --git a/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
index 15af513d62..fc99cf88d2 100644
--- a/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala
@@ -37,6 +37,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(name: String) extends Abstra
}
def isShutdown = !active
+
+ def usesActorMailbox = false
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
diff --git a/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
index 941e701410..3f33d4ffc0 100644
--- a/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala
@@ -134,6 +134,8 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
if (fair) true
else nrOfBusyMessages < 100
}
+
+ def usesActorMailbox = false
def ensureNotActive: Unit = if (active) throw new IllegalStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
index 8b1463f655..fbfffc999e 100644
--- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala
@@ -41,6 +41,8 @@ class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler:
def isShutdown = !active
+ def usesActorMailbox = false
+
def shutdown = if (active) {
log.debug("Shutting down ThreadBasedDispatcher [%s]", name)
active = false
diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala
index 72a7f37229..81d5591fbb 100644
--- a/akka-core/src/main/scala/remote/RemoteClient.scala
+++ b/akka-core/src/main/scala/remote/RemoteClient.scala
@@ -200,7 +200,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel)
if (!connection.isSuccess) {
- listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(connection.getCause))
+ listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(connection.getCause))
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
}
isRunning = true
@@ -232,7 +232,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
}
} else {
val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
- listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception))
+ listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(exception))
throw exception
}
@@ -325,12 +325,12 @@ class RemoteClientHandler(val name: String,
futures.remove(reply.getId)
} else {
val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result)
- client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception))
+ client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(exception))
throw exception
}
} catch {
case e: Exception =>
- client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(e))
+ client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(e))
log.error("Unexpected exception in remote client handler: %s", e)
throw e
}
@@ -345,7 +345,7 @@ class RemoteClientHandler(val name: String,
// Wait until the connection attempt succeeds or fails.
client.connection.awaitUninterruptibly
if (!client.connection.isSuccess) {
- client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(client.connection.getCause))
+ client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(client.connection.getCause))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
}
@@ -353,17 +353,17 @@ class RemoteClientHandler(val name: String,
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
- client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientConnected(client.hostname, client.port))
+ client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientConnected(client.hostname, client.port))
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
- client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientDisconnected(client.hostname, client.port))
+ client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientDisconnected(client.hostname, client.port))
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
- client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(event.getCause))
+ client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(event.getCause))
log.error(event.getCause, "Unexpected exception from downstream in remote client")
event.getChannel.close
}
diff --git a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala
index 1156a34b27..65558dd997 100644
--- a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala
+++ b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala
@@ -4,7 +4,7 @@
package se.scalablesolutions.akka.remote
-//import se.scalablesolutions.akka.serialization.Serializable.SBinary
+import se.scalablesolutions.akka.serialization.Serializable.SBinary
import se.scalablesolutions.akka.serialization.{Serializer, Serializable, SerializationProtocol}
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
@@ -14,7 +14,7 @@ object RemoteProtocolBuilder {
private var SERIALIZER_JAVA: Serializer.Java = Serializer.Java
private var SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON
private var SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON
- //private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
+ private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
@@ -26,9 +26,9 @@ object RemoteProtocolBuilder {
def getMessage(request: RemoteRequest): Any = {
request.getProtocol match {
- //case SerializationProtocol.SBINARY =>
- // val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
- // renderer.fromBytes(request.getMessage.toByteArray)
+ case SerializationProtocol.SBINARY =>
+ val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
+ renderer.fromBytes(request.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest)))
@@ -47,9 +47,9 @@ object RemoteProtocolBuilder {
def getMessage(reply: RemoteReply): Any = {
reply.getProtocol match {
- //case SerializationProtocol.SBINARY =>
- // val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
- // renderer.fromBytes(reply.getMessage.toByteArray)
+ case SerializationProtocol.SBINARY =>
+ val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
+ renderer.fromBytes(reply.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
@@ -67,12 +67,12 @@ object RemoteProtocolBuilder {
}
def setMessage(message: Any, builder: RemoteRequest.Builder) = {
- /*if (message.isInstanceOf[Serializable.SBinary[_]]) {
+ if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setProtocol(SerializationProtocol.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
- } else*/ if (message.isInstanceOf[Message]) {
+ } else if (message.isInstanceOf[Message]) {
val serializable = message.asInstanceOf[Message]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
@@ -95,12 +95,12 @@ object RemoteProtocolBuilder {
}
def setMessage(message: Any, builder: RemoteReply.Builder) = {
- /*if (message.isInstanceOf[Serializable.SBinary[_]]) {
+ if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setProtocol(SerializationProtocol.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
- } else*/ if (message.isInstanceOf[Message]) {
+ } else if (message.isInstanceOf[Message]) {
val serializable = message.asInstanceOf[Message]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
diff --git a/akka-core/src/main/scala/stm/HashTrie.scala b/akka-core/src/main/scala/stm/HashTrie.scala
index 8ef4138d85..91930390c5 100644
--- a/akka-core/src/main/scala/stm/HashTrie.scala
+++ b/akka-core/src/main/scala/stm/HashTrie.scala
@@ -30,7 +30,7 @@
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
- **/
+ */
package se.scalablesolutions.akka.stm
@@ -52,9 +52,7 @@ final class HashTrie[K, +V] private (root: Node[K, V]) extends Map[K, V] with Pe
def get(key: K) = root(key, key.hashCode)
- override def +[A >: V](pair: (K, A)) = pair match {
- case (k, v) => update(k, v)
- }
+ override def +[A >: V](pair: (K, A)) = update(pair._1, pair._2)
override def update[A >: V](key: K, value: A) = new HashTrie(root(0, key, key.hashCode) = value)
@@ -68,7 +66,7 @@ final class HashTrie[K, +V] private (root: Node[K, V]) extends Map[K, V] with Pe
}
object HashTrie {
- def apply[K, V](pairs: (K, V)*) = pairs.foldLeft((new HashTrie[K, V]).asInstanceOf[Map[K,V]]) { _ + _ }
+ def apply[K, V](pairs: (K, V)*) = pairs.foldLeft(new HashTrie[K, V]) { _ + _ }
def unapplySeq[K, V](map: HashTrie[K, V]) = map.toSeq
}
diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala
index 16a3cd0e5f..209c131781 100644
--- a/akka-core/src/main/scala/stm/Transaction.scala
+++ b/akka-core/src/main/scala/stm/Transaction.scala
@@ -22,6 +22,25 @@ import org.multiverse.stms.alpha.AlphaStm
class NoTransactionInScopeException extends RuntimeException
class TransactionRetryException(message: String) extends RuntimeException(message)
+/**
+ * FIXDOC: document AtomicTemplate
+ * AtomicTemplate can be used to create atomic blocks from Java code.
+ *
+ * User newUser = new AtomicTemplate[User]() {
+ * User atomic() {
+ * ... // create user atomically
+ * return user;
+ * }
+ * }.execute();
+ *
+ */
+trait AtomicTemplate[T] {
+ def atomic: T
+ def execute: T = Transaction.Local.atomic {
+ atomic
+ }
+}
+
object Transaction {
val idFactory = new AtomicLong(-1L)
diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala
index 7afb3fb6bb..9bf4859ee5 100644
--- a/akka-core/src/main/scala/stm/TransactionalState.scala
+++ b/akka-core/src/main/scala/stm/TransactionalState.scala
@@ -32,8 +32,13 @@ import org.multiverse.stms.alpha.AlphaRef
*/
object TransactionalState {
def newMap[K, V] = TransactionalMap[K, V]()
+ def newMap[K, V](pairs: (K, V)*) = TransactionalMap(pairs: _*)
+
def newVector[T] = TransactionalVector[T]()
+ def newVector[T](elems: T*) = TransactionalVector(elems :_*)
+
def newRef[T] = TransactionalRef[T]()
+ def newRef[T](initialValue: T) = TransactionalRef(initialValue)
}
/**
@@ -57,7 +62,11 @@ trait Committable {
* @author Jonas Bonér
*/
object Ref {
+ type Ref[T] = TransactionalRef[T]
+
def apply[T]() = new Ref[T]
+
+ def apply[T](initialValue: T) = new Ref[T](Some(initialValue))
}
/**
@@ -68,20 +77,14 @@ object Ref {
object TransactionalRef {
/**
- * An implicit conversion that converts an Option to an Iterable value.
+ * An implicit conversion that converts a TransactionalRef to an Iterable value.
*/
implicit def ref2Iterable[T](ref: TransactionalRef[T]): Iterable[T] = ref.toList
def apply[T]() = new TransactionalRef[T]
-}
-/**
- * Implements a transactional managed reference.
- * Alias to TransactionalRef.
- *
- * @author Jonas Bonér
- */
-class Ref[T] extends TransactionalRef[T]
+ def apply[T](initialValue: T) = new TransactionalRef[T](Some(initialValue))
+}
/**
* Implements a transactional managed reference.
@@ -89,19 +92,32 @@ class Ref[T] extends TransactionalRef[T]
*
* @author Jonas Bonér
*/
-class TransactionalRef[T] extends Transactional {
+class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional {
+ self =>
+
import org.multiverse.api.ThreadLocalTransaction._
implicit val txInitName = "TransactionalRef:Init"
val uuid = UUID.newUuid.toString
- private[this] lazy val ref: AlphaRef[T] = new AlphaRef
+ private[this] lazy val ref = {
+ val r = new AlphaRef[T]
+ initialOpt.foreach(r.set(_))
+ r
+ }
def swap(elem: T) = {
ensureIsInTransaction
ref.set(elem)
}
+ def alter(f: T => T): T = {
+ ensureIsInTransaction
+ ensureNotNull
+ ref.set(f(ref.get))
+ ref.get
+ }
+
def get: Option[T] = {
ensureIsInTransaction
if (ref.isNull) None
@@ -129,24 +145,36 @@ class TransactionalRef[T] extends Transactional {
ref.isNull
}
- def map[B](f: T => B): Option[B] = {
+ def map[B](f: T => B): TransactionalRef[B] = {
ensureIsInTransaction
- if (isEmpty) None else Some(f(ref.get))
+ if (isEmpty) TransactionalRef[B] else TransactionalRef(f(ref.get))
}
- def flatMap[B](f: T => Option[B]): Option[B] = {
+ def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = {
ensureIsInTransaction
- if (isEmpty) None else f(ref.get)
+ if (isEmpty) TransactionalRef[B] else f(ref.get)
}
- def filter(p: T => Boolean): Option[T] = {
+ def filter(p: T => Boolean): TransactionalRef[T] = {
ensureIsInTransaction
- if (isEmpty || p(ref.get)) Some(ref.get) else None
+ if (isDefined && p(ref.get)) TransactionalRef(ref.get) else TransactionalRef[T]
}
- def foreach(f: T => Unit) {
+ /**
+ * Necessary to keep from being implicitly converted to Iterable in for comprehensions.
+ */
+ def withFilter(p: T => Boolean): WithFilter = new WithFilter(p)
+
+ class WithFilter(p: T => Boolean) {
+ def map[B](f: T => B): TransactionalRef[B] = self filter p map f
+ def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = self filter p flatMap f
+ def foreach[U](f: T => U): Unit = self filter p foreach f
+ def withFilter(q: T => Boolean): WithFilter = new WithFilter(x => p(x) && q(x))
+ }
+
+ def foreach[U](f: T => U): Unit = {
ensureIsInTransaction
- if (!isEmpty) f(ref.get)
+ if (isDefined) f(ref.get)
}
def elements: Iterator[T] = {
@@ -171,10 +199,15 @@ class TransactionalRef[T] extends Transactional {
private def ensureIsInTransaction =
if (getThreadLocalTransaction eq null) throw new NoTransactionInScopeException
+
+ private def ensureNotNull =
+ if (ref.isNull) throw new RuntimeException("Cannot alter Ref's value when it is null")
}
object TransactionalMap {
def apply[K, V]() = new TransactionalMap[K, V]
+
+ def apply[K, V](pairs: (K, V)*) = new TransactionalMap(Some(HashTrie(pairs: _*)))
}
/**
@@ -184,11 +217,10 @@ object TransactionalMap {
*
* @author Jonas Bonér
*/
-class TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
- protected[this] val ref = TransactionalRef[HashTrie[K, V]]
+class TransactionalMap[K, V](initialOpt: Option[HashTrie[K, V]] = None) extends Transactional with scala.collection.mutable.Map[K, V] {
val uuid = UUID.newUuid.toString
- ref.swap(new HashTrie[K, V])
+ protected[this] lazy val ref = new TransactionalRef(initialOpt.orElse(Some(new HashTrie[K, V])))
def -=(key: K) = {
remove(key)
@@ -239,10 +271,17 @@ class TransactionalMap[K, V] extends Transactional with scala.collection.mutable
override def equals(other: Any): Boolean =
other.isInstanceOf[TransactionalMap[_, _]] &&
other.hashCode == hashCode
+
+ override def toString = if (outsideTransaction) "