Mid ActorID refactoring

This commit is contained in:
Jonas Bonér 2010-04-30 20:22:45 +02:00
parent f35ff945ef
commit 803838d315
10 changed files with 508 additions and 542 deletions

File diff suppressed because it is too large Load diff

View file

@ -11,8 +11,8 @@ import scala.reflect.Manifest
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
sealed trait ActorRegistryEvent
case class ActorRegistered(actor: Actor) extends ActorRegistryEvent
case class ActorUnregistered(actor: Actor) extends ActorRegistryEvent
case class ActorRegistered(actor: ActorID) extends ActorRegistryEvent
case class ActorUnregistered(actor: ActorID) extends ActorRegistryEvent
/**
* Registry holding all Actor instances in the whole system.
@ -27,16 +27,16 @@ case class ActorUnregistered(actor: Actor) extends ActorRegistryEvent
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActorRegistry extends Logging {
private val actorsByUUID = new ConcurrentHashMap[String, Actor]
private val actorsById = new ConcurrentHashMap[String, List[Actor]]
private val actorsByClassName = new ConcurrentHashMap[String, List[Actor]]
private val registrationListeners = new CopyOnWriteArrayList[Actor]
private val actorsByUUID = new ConcurrentHashMap[String, ActorID]
private val actorsById = new ConcurrentHashMap[String, List[ActorID]]
private val actorsByClassName = new ConcurrentHashMap[String, List[ActorID]]
private val registrationListeners = new CopyOnWriteArrayList[ActorID]
/**
* Returns all actors in the system.
*/
def actors: List[Actor] = {
val all = new ListBuffer[Actor]
def actors: List[ActorID] = {
val all = new ListBuffer[ActorID]
val elements = actorsByUUID.elements
while (elements.hasMoreElements) all += elements.nextElement
all.toList
@ -45,7 +45,7 @@ object ActorRegistry extends Logging {
/**
* Invokes a function for all actors.
*/
def foreach(f: (Actor) => Unit) = {
def foreach(f: (ActorID) => Unit) = {
val elements = actorsByUUID.elements
while (elements.hasMoreElements) f(elements.nextElement)
}
@ -57,9 +57,9 @@ object ActorRegistry extends Logging {
val all = new ListBuffer[T]
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val actor = elements.nextElement
if (manifest.erasure.isAssignableFrom(actor.getClass)) {
all += actor.asInstanceOf[T]
val actorId = elements.nextElement
if (manifest.erasure.isAssignableFrom(actorId.actor.getClass)) {
all += actorId.actor.asInstanceOf[T]
}
}
all.toList
@ -77,15 +77,15 @@ object ActorRegistry extends Logging {
/**
* Finds all actors that has a specific id.
*/
def actorsFor(id: String): List[Actor] = {
if (actorsById.containsKey(id)) actorsById.get(id).asInstanceOf[List[Actor]]
def actorsFor(id: String): List[ActorID] = {
if (actorsById.containsKey(id)) actorsById.get(id).asInstanceOf[List[ActorID]]
else Nil
}
/**
* Finds the actor that has a specific UUID.
*/
def actorFor(uuid: String): Option[Actor] = {
def actorFor(uuid: String): Option[ActorID] = {
if (actorsByUUID.containsKey(uuid)) Some(actorsByUUID.get(uuid))
else None
}
@ -93,7 +93,7 @@ object ActorRegistry extends Logging {
/**
* Registers an actor in the ActorRegistry.
*/
def register(actor: Actor) = {
def register(actor: ActorID) = {
// UUID
actorsByUUID.put(actor.uuid, actor)
@ -116,7 +116,7 @@ object ActorRegistry extends Logging {
/**
* Unregisters an actor in the ActorRegistry.
*/
def unregister(actor: Actor) = {
def unregister(actor: ActorID) = {
actorsByUUID remove actor.uuid
actorsById remove actor.getId
actorsByClassName remove actor.getClass.getName
@ -139,18 +139,18 @@ object ActorRegistry extends Logging {
/**
* Adds the registration <code>listener</code> this this registry's listener list.
*/
def addRegistrationListener(listener: Actor) = {
def addRegistrationListener(listener: ActorID) = {
registrationListeners.add(listener)
}
/**
* Removes the registration <code>listener</code> this this registry's listener list.
*/
def removeRegistrationListener(listener: Actor) = {
def removeRegistrationListener(listener: ActorID) = {
registrationListeners.remove(listener)
}
private def foreachListener(f: (Actor) => Unit) {
private def foreachListener(f: (ActorID) => Unit) {
val iterator = registrationListeners.iterator
while (iterator.hasNext) f(iterator.next)
}

View file

@ -84,7 +84,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
faultHandler = Some(handler)
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
private val actors = new ConcurrentHashMap[String, List[Actor]]
private val actors = new ConcurrentHashMap[String, List[ActorID]]
// Cheating, should really go through the dispatcher rather than direct access to a CHM
def getInstance[T](clazz: Class[T]): List[T] = actors.get(clazz.getName).asInstanceOf[List[T]]
@ -94,7 +94,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName)
override def start: Actor = synchronized {
override def start: Unit = synchronized {
ConfiguratorRepository.registerConfigurator(this)
super[Actor].start
}
@ -117,31 +117,35 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
case SupervisorConfig(_, servers) =>
servers.map(server =>
server match {
case Supervise(actor, lifeCycle, remoteAddress) =>
val className = actor.getClass.getName
case Supervise(actorId, lifeCycle, remoteAddress) =>
val className = actorId.actor.getClass.getName
val currentActors = {
val list = actors.get(className)
if (list eq null) List[Actor]()
if (list eq null) List[ActorID]()
else list
}
actors.put(className, actor :: currentActors)
actor.lifeCycle = Some(lifeCycle)
startLink(actor)
actors.put(className, actorId :: currentActors)
actorId.actor.lifeCycle = Some(lifeCycle)
startLink(actorId)
remoteAddress.foreach(address => RemoteServer.actorsFor(
RemoteServer.Address(address.hostname, address.port))
.actors.put(actor.getId, actor))
.actors.put(actorId.getId, actorId))
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val supervisor = factory.newInstanceFor(supervisorConfig).start
val supervisor = {
val instance = factory.newInstanceFor(supervisorConfig)
instance.start
instance
}
supervisor.lifeCycle = Some(LifeCycle(Permanent))
val className = supervisor.getClass.getName
val currentSupervisors = {
val list = actors.get(className)
if (list eq null) List[Actor]()
if (list eq null) List[ActorID]()
else list
}
actors.put(className, supervisor :: currentSupervisors)
link(supervisor)
actors.put(className, supervisor.selfId :: currentSupervisors)
link(supervisor.selfId)
})
}
}

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.config
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.{Actor, ActorID}
import se.scalablesolutions.akka.dispatch.MessageDispatcher
sealed abstract class FaultHandlingStrategy
@ -25,13 +25,13 @@ object ScalaConfig {
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server
class Supervise(val actor: Actor, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server {
class Supervise(val actorId: ActorID, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server {
val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress)
}
object Supervise {
def apply(actor: Actor, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actor, lifeCycle, remoteAddress)
def apply(actor: Actor, lifeCycle: LifeCycle) = new Supervise(actor, lifeCycle, null)
def unapply(supervise: Supervise) = Some((supervise.actor, supervise.lifeCycle, supervise.remoteAddress))
def apply(actorId: ActorID, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorId, lifeCycle, remoteAddress)
def apply(actorId: ActorID, lifeCycle: LifeCycle) = new Supervise(actorId, lifeCycle, null)
def unapply(supervise: Supervise) = Some((supervise.actorId, supervise.lifeCycle, supervise.remoteAddress))
}
case class RestartStrategy(
@ -227,8 +227,8 @@ object JavaConfig {
intf, target, lifeCycle.transform, timeout, transactionRequired, dispatcher,
if (remoteAddress ne null) se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null)
def newSupervised(actor: Actor) =
se.scalablesolutions.akka.config.ScalaConfig.Supervise(actor, lifeCycle.transform)
def newSupervised(actorId: ActorID) =
se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorId, lifeCycle.transform)
}
}

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch
import java.util.{LinkedList, Queue, List}
import java.util.HashMap
import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor}
import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor, ActorID}
abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher {
@volatile protected var active: Boolean = false
@ -18,14 +18,14 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
override def register(actor: Actor) = synchronized {
messageInvokers.put(actor, new ActorMessageInvoker(actor))
super.register(actor)
override def register(actorId: ActorID) = synchronized {
messageInvokers.put(actorId, new ActorMessageInvoker(actorId))
super.register(actorId)
}
override def unregister(actor: Actor) = synchronized {
messageInvokers.remove(actor)
super.unregister(actor)
override def unregister(actorId: ActorID) = synchronized {
messageInvokers.remove(actorId)
super.unregister(actorId)
}
def shutdown = if (active) {

View file

@ -65,21 +65,23 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
var lockAcquiredOnce = false
// this do-wile loop is required to prevent missing new messages between the end of the inner while
// loop and releasing the lock
val lock = invocation.receiver.actor._dispatcherLock
val mailbox = invocation.receiver.actor._mailbox
do {
if (invocation.receiver._dispatcherLock.tryLock) {
if (lock.tryLock) {
lockAcquiredOnce = true
try {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
var messageInvocation = invocation.receiver._mailbox.poll
var messageInvocation = mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
messageInvocation = invocation.receiver._mailbox.poll
messageInvocation = mailbox.poll
}
} finally {
invocation.receiver._dispatcherLock.unlock
lock.unlock
}
}
} while ((lockAcquiredOnce && !invocation.receiver._mailbox.isEmpty))
} while ((lockAcquiredOnce && !mailbox.isEmpty))
}
})
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")

View file

@ -5,7 +5,8 @@
package se.scalablesolutions.akka.dispatch
import java.util.concurrent.CopyOnWriteArrayList
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.{Actor, ActorID}
/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -30,10 +31,12 @@ import se.scalablesolutions.akka.actor.Actor
class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends MessageDispatcher with ThreadPoolBuilder {
@volatile private var active: Boolean = false
implicit def actorId2actor(actorId: ActorID): Actor = actorId.actor
/** Type of the actors registered in this dispatcher. */
private var actorType:Option[Class[_]] = None
private val pooledActors = new CopyOnWriteArrayList[Actor]
private val pooledActors = new CopyOnWriteArrayList[ActorID]
/** The index in the pooled actors list which was last used to steal work */
@volatile private var lastThiefIndex = 0
@ -65,17 +68,18 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
*
* @return true if the mailbox was processed, false otherwise
*/
private def tryProcessMailbox(receiver: Actor): Boolean = {
private def tryProcessMailbox(receiver: ActorID): Boolean = {
var lockAcquiredOnce = false
val lock = receiver.actor._dispatcherLock
// this do-wile loop is required to prevent missing new messages between the end of processing
// the mailbox and releasing the lock
do {
if (receiver._dispatcherLock.tryLock) {
if (lock.tryLock) {
lockAcquiredOnce = true
try {
processMailbox(receiver)
} finally {
receiver._dispatcherLock.unlock
lock.unlock
}
}
} while ((lockAcquiredOnce && !receiver._mailbox.isEmpty))
@ -86,7 +90,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
/**
* Process the messages in the mailbox of the given actor.
*/
private def processMailbox(receiver: Actor) = {
private def processMailbox(receiver: ActorID) = {
var messageInvocation = receiver._mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
@ -94,9 +98,9 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
}
}
private def findThief(receiver: Actor): Option[Actor] = {
private def findThief(receiver: ActorID): Option[ActorID] = {
// copy to prevent concurrent modifications having any impact
val actors = pooledActors.toArray(new Array[Actor](pooledActors.size))
val actors = pooledActors.toArray(new Array[ActorID](pooledActors.size))
var i = lastThiefIndex
if (i > actors.size)
i = 0
@ -104,7 +108,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
// we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
// the dispatcher is being shut down...
doFindThief(receiver, actors, i) match {
case (thief: Option[Actor], index: Int) => {
case (thief: Option[ActorID], index: Int) => {
lastThiefIndex = (index + 1) % actors.size
return thief
}
@ -119,7 +123,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
* @param startIndex first index to start looking in the list (i.e. for round robin)
* @return the thief (or None) and the new index to start searching next time
*/
private def doFindThief(receiver: Actor, actors: Array[Actor], startIndex: Int): (Option[Actor], Int) = {
private def doFindThief(receiver: ActorID, actors: Array[ActorID], startIndex: Int): (Option[ActorID], Int) = {
for (i <- 0 to actors.length) {
val index = (i + startIndex) % actors.length
val actor = actors(index)
@ -136,7 +140,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
* Try donating messages to the thief and processing the thiefs mailbox. Doesn't do anything if we can not acquire
* the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox.
*/
private def tryDonateAndProcessMessages(receiver: Actor, thief: Actor) = {
private def tryDonateAndProcessMessages(receiver: ActorID, thief: ActorID) = {
if (thief._dispatcherLock.tryLock) {
try {
donateAndProcessMessages(receiver, thief)
@ -149,7 +153,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
/**
* Donate messages to the thief and process them on the thief as long as the receiver has more messages.
*/
private def donateAndProcessMessages(receiver: Actor, thief: Actor): Unit = {
private def donateAndProcessMessages(receiver: ActorID, thief: ActorID): Unit = {
donateMessage(receiver, thief) match {
case None => {
// no more messages to donate
@ -165,10 +169,10 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
/**
* Steal a message from the receiver and give it to the thief.
*/
private def donateMessage(receiver: Actor, thief: Actor): Option[MessageInvocation] = {
private def donateMessage(receiver: ActorID, thief: ActorID): Option[MessageInvocation] = {
val donated = receiver._mailbox.pollLast
if (donated != null) {
thief ! donated.message
thief.selfId ! donated.message
return Some(donated)
} else return None
}
@ -189,29 +193,29 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
override def register(actor: Actor) = {
verifyActorsAreOfSameType(actor)
pooledActors.add(actor)
super.register(actor)
override def register(actorId: ActorID) = {
verifyActorsAreOfSameType(actorId)
pooledActors.add(actorId)
super.register(actorId)
}
override def unregister(actor: Actor) = {
pooledActors.remove(actor)
super.unregister(actor)
override def unregister(actorId: ActorID) = {
pooledActors.remove(actorId)
super.unregister(actorId)
}
def usesActorMailbox = true
private def verifyActorsAreOfSameType(newActor: Actor) = {
private def verifyActorsAreOfSameType(newActorId: ActorID) = {
actorType match {
case None => {
actorType = Some(newActor.getClass)
actorType = Some(newActorId.actor.getClass)
}
case Some(aType) => {
if (aType != newActor.getClass)
if (aType != newActorId.actor.getClass)
throw new IllegalStateException(
String.format("Can't register actor %s in a work stealing dispatcher which already knows actors of type %s",
newActor, aType))
newActorId.actor, aType))
}
}
}

View file

@ -7,25 +7,25 @@ package se.scalablesolutions.akka.dispatch
import java.util.List
import se.scalablesolutions.akka.util.{HashCode, Logging}
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.{Actor, ActorID}
import java.util.concurrent.ConcurrentHashMap
import org.multiverse.commitbarriers.CountDownCommitBarrier
final class MessageInvocation(val receiver: Actor,
final class MessageInvocation(val receiver: ActorID,
val message: Any,
val replyTo : Option[Either[Actor,CompletableFuture[Any]]],
val replyTo : Option[Either[ActorID, CompletableFuture[Any]]],
val transactionSet: Option[CountDownCommitBarrier]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null")
def invoke = receiver.invoke(this)
def invoke = receiver.actor.invoke(this)
def send = receiver.dispatcher.dispatch(this)
override def hashCode(): Int = synchronized {
var result = HashCode.SEED
result = HashCode.hash(result, receiver)
result = HashCode.hash(result, receiver.actor)
result = HashCode.hash(result, message.asInstanceOf[AnyRef])
result
}
@ -33,7 +33,7 @@ final class MessageInvocation(val receiver: Actor,
override def equals(that: Any): Boolean = synchronized {
that != null &&
that.isInstanceOf[MessageInvocation] &&
that.asInstanceOf[MessageInvocation].receiver == receiver &&
that.asInstanceOf[MessageInvocation].receiver.actor == receiver.actor &&
that.asInstanceOf[MessageInvocation].message == message
}
@ -56,13 +56,13 @@ trait MessageInvoker {
}
trait MessageDispatcher extends Logging {
protected val references = new ConcurrentHashMap[String, Actor]
protected val references = new ConcurrentHashMap[String, ActorID]
def dispatch(invocation: MessageInvocation)
def start
def shutdown
def register(actor: Actor) = references.put(actor.uuid, actor)
def unregister(actor: Actor) = {
references.remove(actor.uuid)
def register(actorId: ActorID) = references.put(actorId.uuid, actorId)
def unregister(actorId: ActorID) = {
references.remove(actorId.uuid)
if (canBeShutDown)
shutdown // shut down in the dispatcher's references is zero
}

View file

@ -212,19 +212,19 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
* Registers a local endpoint
*/
def registerLocalNode(hostname: String, port: Int): Unit =
this ! RegisterLocalNode(RemoteAddress(hostname, port))
selfId ! RegisterLocalNode(RemoteAddress(hostname, port))
/**
* Deregisters a local endpoint
*/
def deregisterLocalNode(hostname: String, port: Int): Unit =
this ! DeregisterLocalNode(RemoteAddress(hostname, port))
selfId ! DeregisterLocalNode(RemoteAddress(hostname, port))
/**
* Broadcasts the specified message to all Actors of type Class on all known Nodes
*/
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit =
this ! RelayedMessage(to.getName, msg)
selfId ! RelayedMessage(to.getName, msg)
}
/**
@ -261,7 +261,7 @@ object Cluster extends Cluster with Logging {
val sup = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
Supervise(actor, LifeCycle(Permanent)) :: Nil)
Supervise(actor.selfId, LifeCycle(Permanent)) :: Nil)
).newInstance
Some(sup)
}

View file

@ -89,7 +89,7 @@ object RemoteServer {
}
class RemoteActorSet {
val actors = new ConcurrentHashMap[String, Actor]
val actors = new ConcurrentHashMap[String, ActorID]
val activeObjects = new ConcurrentHashMap[String, AnyRef]
}