From c2a156d514d78ef486b7e643978b8fa142ebd06f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 5 Aug 2010 12:04:32 +0200 Subject: [PATCH] Race condition should be patched now --- .../src/main/scala/actor/ActorRegistry.scala | 34 +++++++++++++------ .../ExecutorBasedEventDrivenDispatcher.scala | 2 +- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index 63ddb939c7..905d1e7f2e 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -122,19 +122,30 @@ object ActorRegistry extends ListenerManagement { * Registers an actor in the ActorRegistry. */ def register(actor: ActorRef) = { - // UUID - actorsByUUID.put(actor.uuid, actor) - // ID val id = actor.id if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor) - if (actorsById.containsKey(id)) actorsById.get(id).add(actor) - else { - val set = new ConcurrentSkipListSet[ActorRef] - set.add(actor) - actorsById.put(id, set) + + //Tries to avoid a race-condition + def registerById(attemptNo: Int = 0,maxAttempts: Int = 10) { + if(attemptNo >= maxAttempts) + throw new IllegalStateException("Tried to add %s to the ActorRegistry %d times but failed.".format(actor,maxAttempts)) + val set = actorsById.get(id) + if(set ne null) + set.add(actor) + else { + val newSet = new ConcurrentSkipListSet[ActorRef] + newSet.add(actor) + if(actorsById.putIfAbsent(id,newSet) ne null) + registerById(attemptNo+1) + } } + registerById() + + // UUID + actorsByUUID.put(actor.uuid, actor) + // notify listeners foreachListener(_ ! ActorRegistered(actor)) } @@ -145,8 +156,11 @@ object ActorRegistry extends ListenerManagement { def unregister(actor: ActorRef) = { actorsByUUID remove actor.uuid - val id = actor.id - if (actorsById.containsKey(id)) actorsById.get(id).remove(actor) + val set = actorsById.get(actor.id) + if (set ne null) + set remove actor + + //FIXME: safely remove set if empty, leaks memory // notify listeners foreachListener(_ ! ActorUnregistered(actor)) diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 836dc0ea86..1f03c1eba2 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -82,7 +82,7 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat override def register(actorRef: ActorRef) = { // The actor will need a ConcurrentLinkedDeque based mailbox - if( actorRef.mailbox == null ) { + if( actorRef.mailbox eq null ) { actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]() } super.register(actorRef)