Race condition should be patched now
This commit is contained in:
parent
5168bb5cae
commit
c2a156d514
2 changed files with 25 additions and 11 deletions
|
|
@ -122,19 +122,30 @@ object ActorRegistry extends ListenerManagement {
|
|||
* Registers an actor in the ActorRegistry.
|
||||
*/
|
||||
def register(actor: ActorRef) = {
|
||||
// UUID
|
||||
actorsByUUID.put(actor.uuid, actor)
|
||||
|
||||
// ID
|
||||
val id = actor.id
|
||||
if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor)
|
||||
if (actorsById.containsKey(id)) actorsById.get(id).add(actor)
|
||||
else {
|
||||
val set = new ConcurrentSkipListSet[ActorRef]
|
||||
set.add(actor)
|
||||
actorsById.put(id, set)
|
||||
|
||||
//Tries to avoid a race-condition
|
||||
def registerById(attemptNo: Int = 0,maxAttempts: Int = 10) {
|
||||
if(attemptNo >= maxAttempts)
|
||||
throw new IllegalStateException("Tried to add %s to the ActorRegistry %d times but failed.".format(actor,maxAttempts))
|
||||
val set = actorsById.get(id)
|
||||
if(set ne null)
|
||||
set.add(actor)
|
||||
else {
|
||||
val newSet = new ConcurrentSkipListSet[ActorRef]
|
||||
newSet.add(actor)
|
||||
if(actorsById.putIfAbsent(id,newSet) ne null)
|
||||
registerById(attemptNo+1)
|
||||
}
|
||||
}
|
||||
|
||||
registerById()
|
||||
|
||||
// UUID
|
||||
actorsByUUID.put(actor.uuid, actor)
|
||||
|
||||
// notify listeners
|
||||
foreachListener(_ ! ActorRegistered(actor))
|
||||
}
|
||||
|
|
@ -145,8 +156,11 @@ object ActorRegistry extends ListenerManagement {
|
|||
def unregister(actor: ActorRef) = {
|
||||
actorsByUUID remove actor.uuid
|
||||
|
||||
val id = actor.id
|
||||
if (actorsById.containsKey(id)) actorsById.get(id).remove(actor)
|
||||
val set = actorsById.get(actor.id)
|
||||
if (set ne null)
|
||||
set remove actor
|
||||
|
||||
//FIXME: safely remove set if empty, leaks memory
|
||||
|
||||
// notify listeners
|
||||
foreachListener(_ ! ActorUnregistered(actor))
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
|
|||
|
||||
override def register(actorRef: ActorRef) = {
|
||||
// The actor will need a ConcurrentLinkedDeque based mailbox
|
||||
if( actorRef.mailbox == null ) {
|
||||
if( actorRef.mailbox eq null ) {
|
||||
actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]()
|
||||
}
|
||||
super.register(actorRef)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue