cleaned up actor field access modifiers and prefixed internal fields with _ to avoid name clashes
This commit is contained in:
parent
ffbe3fbb6b
commit
3d7eecb243
6 changed files with 88 additions and 78 deletions
|
|
@ -20,6 +20,7 @@ import se.scalablesolutions.akka.util.Helpers.ReadWriteLock
|
|||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
|
||||
import org.codehaus.aspectwerkz.proxy.Uuid
|
||||
|
||||
import org.multiverse.utils.ThreadLocalTransaction._
|
||||
|
||||
|
|
@ -54,28 +55,25 @@ object Actor {
|
|||
}
|
||||
|
||||
/**
|
||||
* Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model'
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait Actor extends Logging with TransactionManagement {
|
||||
ActorRegistry.register(this)
|
||||
|
||||
// FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait
|
||||
val uuid = Uuid.newUuid.toString
|
||||
|
||||
@volatile private[this] var isRunning: Boolean = false
|
||||
private[this] val remoteFlagLock = new ReadWriteLock
|
||||
private[this] val transactionalFlagLock = new ReadWriteLock
|
||||
|
||||
private var hotswap: Option[PartialFunction[Any, Unit]] = None
|
||||
private var config: Option[AnyRef] = None
|
||||
|
||||
@volatile protected[this] var isTransactionRequiresNew = false
|
||||
@volatile protected[this] var remoteAddress: Option[InetSocketAddress] = None
|
||||
@volatile protected[akka] var supervisor: Option[Actor] = None
|
||||
|
||||
protected[akka] var mailbox: MessageQueue = _
|
||||
protected[this] var senderFuture: Option[CompletableFutureResult] = None
|
||||
protected[this] val linkedActors = new HashSet[Actor]
|
||||
protected[actor] var lifeCycleConfig: Option[LifeCycle] = None
|
||||
|
||||
val name = this.getClass.getName
|
||||
// private fields
|
||||
@volatile private var _isRunning: Boolean = false
|
||||
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
|
||||
private var _config: Option[AnyRef] = None
|
||||
private val _remoteFlagLock = new ReadWriteLock
|
||||
private var _senderFuture: Option[CompletableFutureResult] = None
|
||||
private var _remoteAddress: Option[InetSocketAddress] = None
|
||||
private[akka] val _linkedActors = new HashSet[Actor]
|
||||
private[akka] var _mailbox: MessageQueue = _
|
||||
private[akka] var _supervisor: Option[Actor] = None
|
||||
|
||||
// ====================================
|
||||
// ==== USER CALLBACKS TO OVERRIDE ====
|
||||
|
|
@ -88,6 +86,15 @@ trait Actor extends Logging with TransactionManagement {
|
|||
*/
|
||||
@volatile var timeout: Long = Actor.TIMEOUT
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
*
|
||||
* Defines the life-cycle for a supervised actor.
|
||||
*
|
||||
* Needs to be set if the actor is supervised programmatically.
|
||||
*/
|
||||
@volatile var lifeCycleConfig: Option[LifeCycle] = None
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
*
|
||||
|
|
@ -110,7 +117,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
*/
|
||||
protected[akka] var messageDispatcher: MessageDispatcher = {
|
||||
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher(getClass.getName)
|
||||
mailbox = dispatcher.messageQueue
|
||||
_mailbox = dispatcher.messageQueue
|
||||
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
|
||||
dispatcher
|
||||
}
|
||||
|
|
@ -129,6 +136,14 @@ trait Actor extends Logging with TransactionManagement {
|
|||
*/
|
||||
protected[this] var trapExit: Boolean = false
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
*
|
||||
* Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should
|
||||
* start if there is no one running, else it joins the existing transaction.
|
||||
*/
|
||||
@volatile protected var isTransactionRequiresNew = false
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
*
|
||||
|
|
@ -213,9 +228,9 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* Starts up the actor and its message queue.
|
||||
*/
|
||||
def start = synchronized {
|
||||
if (!isRunning) {
|
||||
if (!_isRunning) {
|
||||
messageDispatcher.start
|
||||
isRunning = true
|
||||
_isRunning = true
|
||||
//if (isTransactional) this !! TransactionalInit
|
||||
}
|
||||
log.info("[%s] has started", toString)
|
||||
|
|
@ -225,11 +240,11 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* Stops the actor and its message queue.
|
||||
*/
|
||||
def stop = synchronized {
|
||||
if (isRunning) {
|
||||
if (_isRunning) {
|
||||
dispatcher.unregisterHandler(this)
|
||||
if (dispatcher.isInstanceOf[ThreadBasedDispatcher]) dispatcher.shutdown
|
||||
// FIXME: Need to do reference count to know if EventBasedThreadPoolDispatcher and EventBasedSingleThreadDispatcher can be shut down
|
||||
isRunning = false
|
||||
_isRunning = false
|
||||
shutdown
|
||||
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
|
@ -238,7 +253,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
|
||||
*/
|
||||
def !(message: AnyRef) =
|
||||
if (isRunning) postMessageToMailbox(message)
|
||||
if (_isRunning) postMessageToMailbox(message)
|
||||
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
|
||||
/**
|
||||
|
|
@ -251,7 +266,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def !: Option[T] = if (isRunning) {
|
||||
def !: Option[T] = if (_isRunning) {
|
||||
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
|
||||
val isActiveObject = message.isInstanceOf[Invocation]
|
||||
if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None)
|
||||
|
|
@ -283,7 +298,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* <b>NOTE:</b>
|
||||
* Should be used with care (almost never), since very dangerous (will block a thread indefinitely if no reply).
|
||||
*/
|
||||
def !?[T](message: AnyRef): T = if (isRunning) {
|
||||
def !?[T](message: AnyRef): T = if (_isRunning) {
|
||||
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
|
||||
future.awaitBlocking
|
||||
getResultOrThrowException(future).get
|
||||
|
|
@ -297,7 +312,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* Does only work together with the actor <code>!!</code> method and/or active objects not annotated
|
||||
* with <code>@oneway</code>.
|
||||
*/
|
||||
protected[this] def reply(message: AnyRef) = senderFuture match {
|
||||
protected[this] def reply(message: AnyRef) = _senderFuture match {
|
||||
case None => throw new IllegalStateException(
|
||||
"\n\tNo sender in scope, can't reply. " +
|
||||
"\n\tHave you used the '!' message send or the '@oneway' active object annotation? " +
|
||||
|
|
@ -311,9 +326,9 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
|
||||
*/
|
||||
def dispatcher_=(dispatcher: MessageDispatcher): Unit = synchronized {
|
||||
if (!isRunning) {
|
||||
if (!_isRunning) {
|
||||
messageDispatcher = dispatcher
|
||||
mailbox = messageDispatcher.messageQueue
|
||||
_mailbox = messageDispatcher.messageQueue
|
||||
messageDispatcher.registerHandler(this, new ActorMessageInvoker(this))
|
||||
} else throw new IllegalArgumentException("Can not swap dispatcher for " + toString + " after it has been started")
|
||||
}
|
||||
|
|
@ -321,15 +336,15 @@ trait Actor extends Logging with TransactionManagement {
|
|||
/**
|
||||
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
|
||||
*/
|
||||
def makeRemote(hostname: String, port: Int): Unit = remoteFlagLock.withWriteLock {
|
||||
def makeRemote(hostname: String, port: Int): Unit = _remoteFlagLock.withWriteLock {
|
||||
makeRemote(new InetSocketAddress(hostname, port))
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
|
||||
*/
|
||||
def makeRemote(address: InetSocketAddress): Unit = remoteFlagLock.withWriteLock {
|
||||
remoteAddress = Some(address)
|
||||
def makeRemote(address: InetSocketAddress): Unit = _remoteFlagLock.withWriteLock {
|
||||
_remoteAddress = Some(address)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -341,7 +356,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* </pre>
|
||||
*/
|
||||
def makeTransactionRequired = synchronized {
|
||||
if (isRunning) throw new IllegalArgumentException("Can not make actor transaction required after it has been started")
|
||||
if (_isRunning) throw new IllegalArgumentException("Can not make actor transaction required after it has been started")
|
||||
else isTransactionRequiresNew = true
|
||||
}
|
||||
|
||||
|
|
@ -352,10 +367,10 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* To be invoked from within the actor itself.
|
||||
*/
|
||||
protected[this] def link(actor: Actor) = {
|
||||
if (isRunning) {
|
||||
linkedActors.add(actor)
|
||||
if (actor.supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
|
||||
actor.supervisor = Some(this)
|
||||
if (_isRunning) {
|
||||
_linkedActors.add(actor)
|
||||
if (actor._supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
|
||||
actor._supervisor = Some(this)
|
||||
log.debug("Linking actor [%s] to actor [%s]", actor, this)
|
||||
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
|
@ -366,10 +381,10 @@ trait Actor extends Logging with TransactionManagement {
|
|||
* To be invoked from within the actor itself.
|
||||
*/
|
||||
protected[this] def unlink(actor: Actor) = {
|
||||
if (isRunning) {
|
||||
if (!linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink")
|
||||
linkedActors.remove(actor)
|
||||
actor.supervisor = None
|
||||
if (_isRunning) {
|
||||
if (!_linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink")
|
||||
_linkedActors.remove(actor)
|
||||
actor._supervisor = None
|
||||
log.debug("Unlinking actor [%s] from actor [%s]", actor, this)
|
||||
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
|
|
@ -404,7 +419,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
val actor = actorClass.newInstance.asInstanceOf[T]
|
||||
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
|
||||
actor.dispatcher = dispatcher
|
||||
actor.mailbox = mailbox
|
||||
actor._mailbox = _mailbox
|
||||
}
|
||||
actor.start
|
||||
actor
|
||||
|
|
@ -420,7 +435,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
actor.makeRemote(hostname, port)
|
||||
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
|
||||
actor.dispatcher = dispatcher
|
||||
actor.mailbox = mailbox
|
||||
actor._mailbox = _mailbox
|
||||
}
|
||||
actor.start
|
||||
actor
|
||||
|
|
@ -453,8 +468,8 @@ trait Actor extends Logging with TransactionManagement {
|
|||
// ==== IMPLEMENTATION DETAILS ====
|
||||
// ================================
|
||||
|
||||
private def postMessageToMailbox(message: AnyRef): Unit = remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
|
||||
if (remoteAddress.isDefined) {
|
||||
private def postMessageToMailbox(message: AnyRef): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
|
||||
if (_remoteAddress.isDefined) {
|
||||
val requestBuilder = RemoteRequest.newBuilder
|
||||
.setId(RemoteRequestIdFactory.nextId)
|
||||
.setTarget(this.getClass.getName)
|
||||
|
|
@ -465,15 +480,15 @@ trait Actor extends Logging with TransactionManagement {
|
|||
val id = registerSupervisorAsRemoteActor
|
||||
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
|
||||
RemoteProtocolBuilder.setMessage(message, requestBuilder)
|
||||
RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build)
|
||||
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
|
||||
} else {
|
||||
val handle = new MessageInvocation(this, message, None, currentTransaction.get)
|
||||
handle.send
|
||||
}
|
||||
}
|
||||
|
||||
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
|
||||
if (remoteAddress.isDefined) {
|
||||
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
|
||||
if (_remoteAddress.isDefined) {
|
||||
val requestBuilder = RemoteRequest.newBuilder
|
||||
.setId(RemoteRequestIdFactory.nextId)
|
||||
.setTarget(this.getClass.getName)
|
||||
|
|
@ -484,7 +499,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
RemoteProtocolBuilder.setMessage(message, requestBuilder)
|
||||
val id = registerSupervisorAsRemoteActor
|
||||
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
|
||||
val future = RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build)
|
||||
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
|
||||
if (future.isDefined) future.get
|
||||
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
|
||||
} else {
|
||||
|
|
@ -509,13 +524,13 @@ trait Actor extends Logging with TransactionManagement {
|
|||
val message = messageHandle.message //serializeMessage(messageHandle.message)
|
||||
val future = messageHandle.future
|
||||
try {
|
||||
senderFuture = future
|
||||
_senderFuture = future
|
||||
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
|
||||
else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
|
||||
} catch {
|
||||
case e =>
|
||||
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
|
||||
if (supervisor.isDefined) supervisor.get ! Exit(this, e)
|
||||
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
|
||||
if (future.isDefined) future.get.completeWithException(this, e)
|
||||
else e.printStackTrace
|
||||
} finally {
|
||||
|
|
@ -540,9 +555,9 @@ trait Actor extends Logging with TransactionManagement {
|
|||
}
|
||||
|
||||
try {
|
||||
senderFuture = future
|
||||
_senderFuture = future
|
||||
if (isTransactionRequiresNew && !isTransactionInScope) {
|
||||
if (senderFuture.isEmpty) throw new StmException(
|
||||
if (_senderFuture.isEmpty) throw new StmException(
|
||||
"\n\tCan't continue transaction in a one-way fire-forget message send" +
|
||||
"\n\tE.g. using Actor '!' method or Active Object 'void' method" +
|
||||
"\n\tPlease use the Actor '!!', '!?' methods or Active Object method with non-void return type")
|
||||
|
|
@ -560,7 +575,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
clearTransaction // need to clear currentTransaction before call to supervisor
|
||||
|
||||
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
|
||||
if (supervisor.isDefined) supervisor.get ! Exit(this, e)
|
||||
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
|
||||
} finally {
|
||||
clearTransaction
|
||||
}
|
||||
|
|
@ -570,11 +585,11 @@ trait Actor extends Logging with TransactionManagement {
|
|||
if (future.exception.isDefined) throw future.exception.get._2
|
||||
else future.result.asInstanceOf[Option[T]]
|
||||
|
||||
private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive)
|
||||
private def base: PartialFunction[Any, Unit] = lifeCycle orElse (_hotswap getOrElse receive)
|
||||
|
||||
private val lifeCycle: PartialFunction[Any, Unit] = {
|
||||
case Init(config) => init(config)
|
||||
case HotSwap(code) => hotswap = code
|
||||
case HotSwap(code) => _hotswap = code
|
||||
case Restart(reason) => restart(reason)
|
||||
case Exit(dead, reason) => handleTrapExit(dead, reason)
|
||||
// case TransactionalInit => initTransactionalState
|
||||
|
|
@ -590,12 +605,12 @@ trait Actor extends Logging with TransactionManagement {
|
|||
}
|
||||
} else throw new IllegalStateException("No 'faultHandler' defined for actor with the 'trapExit' flag set to true - can't proceed " + toString)
|
||||
} else {
|
||||
if (supervisor.isDefined) supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on
|
||||
if (_supervisor.isDefined) _supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def restartLinkedActors(reason: AnyRef) =
|
||||
linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason))
|
||||
_linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason))
|
||||
|
||||
private[Actor] def restart(reason: AnyRef) = synchronized {
|
||||
lifeCycleConfig match {
|
||||
|
|
@ -605,9 +620,9 @@ trait Actor extends Logging with TransactionManagement {
|
|||
case Some(LifeCycle(scope, shutdownTime, _)) => {
|
||||
scope match {
|
||||
case Permanent => {
|
||||
preRestart(reason, config)
|
||||
preRestart(reason, _config)
|
||||
log.info("Restarting actor [%s] configured as PERMANENT.", id)
|
||||
postRestart(reason, config)
|
||||
postRestart(reason, _config)
|
||||
}
|
||||
|
||||
case Temporary =>
|
||||
|
|
@ -626,16 +641,16 @@ trait Actor extends Logging with TransactionManagement {
|
|||
}
|
||||
|
||||
private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
|
||||
if (supervisor.isDefined) {
|
||||
RemoteClient.clientFor(remoteAddress.get).registerSupervisorForActor(this)
|
||||
Some(supervisor.get.uuid)
|
||||
if (_supervisor.isDefined) {
|
||||
RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this)
|
||||
Some(_supervisor.get.uuid)
|
||||
} else None
|
||||
}
|
||||
|
||||
|
||||
private[akka] def swapDispatcher(disp: MessageDispatcher) = synchronized {
|
||||
messageDispatcher = disp
|
||||
mailbox = messageDispatcher.messageQueue
|
||||
_mailbox = messageDispatcher.messageQueue
|
||||
messageDispatcher.registerHandler(this, new ActorMessageInvoker(this))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -127,10 +127,10 @@ class Supervisor private[akka] (handler: FaultHandlingStrategy) extends Actor wi
|
|||
|
||||
protected def receive: PartialFunction[Any, Unit] = {
|
||||
case StartSupervisor =>
|
||||
linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.start; log.info("Starting actor: %s", actor) }
|
||||
_linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.start; log.info("Starting actor: %s", actor) }
|
||||
|
||||
case StopSupervisor =>
|
||||
linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.stop; log.info("Stopping actor: %s", actor) }
|
||||
_linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.stop; log.info("Stopping actor: %s", actor) }
|
||||
log.info("Stopping supervisor: %s", this)
|
||||
stop
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ class MessageInvocation(val receiver: Actor,
|
|||
private [akka] val nrOfDeliveryAttempts = new AtomicInteger(0)
|
||||
|
||||
def send = synchronized {
|
||||
receiver.mailbox.append(this)
|
||||
receiver._mailbox.append(this)
|
||||
nrOfDeliveryAttempts.incrementAndGet
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -108,12 +108,12 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
|
|||
} else throw new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
|
||||
|
||||
def registerSupervisorForActor(actor: Actor) =
|
||||
if (!actor.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actor + " since it is not under supervision")
|
||||
else supervisors.putIfAbsent(actor.supervisor.get.uuid, actor)
|
||||
if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actor + " since it is not under supervision")
|
||||
else supervisors.putIfAbsent(actor._supervisor.get.uuid, actor)
|
||||
|
||||
def deregisterSupervisorForActor(actor: Actor) =
|
||||
if (!actor.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision")
|
||||
else supervisors.remove(actor.supervisor.get.uuid)
|
||||
if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision")
|
||||
else supervisors.remove(actor._supervisor.get.uuid)
|
||||
|
||||
def deregisterSupervisorWithUuid(uuid: String) = supervisors.remove(uuid)
|
||||
}
|
||||
|
|
@ -169,8 +169,8 @@ class RemoteClientHandler(val name: String,
|
|||
val supervisorUuid = reply.getSupervisorUuid
|
||||
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
|
||||
val supervisedActor = supervisors.get(supervisorUuid)
|
||||
if (!supervisedActor.supervisor.isDefined) throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
|
||||
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply))
|
||||
if (!supervisedActor._supervisor.isDefined) throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
|
||||
else supervisedActor._supervisor.get ! Exit(supervisedActor, parseException(reply))
|
||||
}
|
||||
future.completeWithException(null, parseException(reply))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,8 +8,6 @@ import java.util.concurrent.atomic.AtomicBoolean
|
|||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import org.codehaus.aspectwerkz.proxy.Uuid
|
||||
|
||||
import scala.collection.mutable.HashSet
|
||||
|
||||
import org.multiverse.utils.ThreadLocalTransaction._
|
||||
|
|
@ -38,9 +36,6 @@ object TransactionManagement extends TransactionManagement {
|
|||
}
|
||||
|
||||
trait TransactionManagement extends Logging {
|
||||
// FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait
|
||||
var uuid = Uuid.newUuid.toString
|
||||
|
||||
import TransactionManagement.currentTransaction
|
||||
private[akka] val activeTransactions = new HashSet[Transaction]
|
||||
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ class EventBasedSingleThreadActorTest extends JUnitSuite {
|
|||
private val unit = TimeUnit.MILLISECONDS
|
||||
|
||||
class TestActor extends Actor {
|
||||
dispatcher = Dispatchers.newEventBasedSingleThreadDispatcher(name)
|
||||
dispatcher = Dispatchers.newEventBasedSingleThreadDispatcher(uuid)
|
||||
|
||||
def receive: PartialFunction[Any, Unit] = {
|
||||
case "Hello" =>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue