From 8b9a56e89ce68da697ed6e93dfe1cce2fbbbd8e2 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Sun, 10 Jul 2011 09:58:42 +0300 Subject: [PATCH] Index is moved to the akka.util package --- .../main/scala/akka/actor/ActorRegistry.scala | 117 +---------------- .../src/main/scala/akka/util/Index.scala | 124 ++++++++++++++++++ .../remote/netty/NettyRemoteSupport.scala | 1 - 3 files changed, 125 insertions(+), 117 deletions(-) create mode 100644 akka-actor/src/main/scala/akka/util/Index.scala diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index 99ca079646..9dafb5a90e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -11,6 +11,7 @@ import annotation.tailrec import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap } import java.util.{ Set ⇒ JSet } +import akka.util.Index import akka.util.ReflectiveAccess._ import akka.util.ListenerManagement import akka.serialization._ @@ -254,119 +255,3 @@ class LocalActorRegistry( private def typedActorFor(actorRef: ActorRef): Option[AnyRef] = typedActorFor(actorRef.uuid) } - -/** - * FIXME move Index to its own file and put in akka.util. - * - * An implementation of a ConcurrentMultiMap - * Adds/remove is serialized over the specified key - * Reads are fully concurrent <-- el-cheapo - * - * @author Viktor Klang - */ -class Index[K <: AnyRef, V <: AnyRef: Manifest] { - private val Naught = Array[V]() //Nil for Arrays - private val container = new ConcurrentHashMap[K, JSet[V]] - private val emptySet = new ConcurrentSkipListSet[V] - - /** - * Associates the value of type V with the key of type K - * @return true if the value didn't exist for the key previously, and false otherwise - */ - def put(key: K, value: V): Boolean = { - //Tailrecursive spin-locking put - @tailrec - def spinPut(k: K, v: V): Boolean = { - var retry = false - var added = false - val set = container get k - - if (set ne null) { - set.synchronized { - if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry - else { //Else add the value to the set and signal that retry is not needed - added = set add v - retry = false - } - } - } else { - val newSet = new ConcurrentSkipListSet[V] - newSet add v - - // Parry for two simultaneous putIfAbsent(id,newSet) - val oldSet = container.putIfAbsent(k, newSet) - if (oldSet ne null) { - oldSet.synchronized { - if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry - else { //Else try to add the value to the set and signal that retry is not needed - added = oldSet add v - retry = false - } - } - } else added = true - } - - if (retry) spinPut(k, v) - else added - } - - spinPut(key, value) - } - - /** - * @return a _new_ array of all existing values for the given key at the time of the call - */ - def values(key: K): Array[V] = { - val set: JSet[V] = container get key - val result = if (set ne null) set toArray Naught else Naught - result.asInstanceOf[Array[V]] - } - - /** - * @return Some(value) for the first matching value where the supplied function returns true for the given key, - * if no matches it returns None - */ - def findValue(key: K)(f: (V) ⇒ Boolean): Option[V] = { - import scala.collection.JavaConversions._ - val set = container get key - if (set ne null) set.iterator.find(f) - else None - } - - /** - * Applies the supplied function to all keys and their values - */ - def foreach(fun: (K, V) ⇒ Unit) { - import scala.collection.JavaConversions._ - container.entrySet foreach { e ⇒ e.getValue.foreach(fun(e.getKey, _)) } - } - - /** - * Disassociates the value of type V from the key of type K - * @return true if the value was disassociated from the key and false if it wasn't previously associated with the key - */ - def remove(key: K, value: V): Boolean = { - val set = container get key - - if (set ne null) { - set.synchronized { - if (set.remove(value)) { //If we can remove the value - if (set.isEmpty) //and the set becomes empty - container.remove(key, emptySet) //We try to remove the key if it's mapped to an empty set - - true //Remove succeeded - } else false //Remove failed - } - } else false //Remove failed - } - - /** - * @return true if the underlying containers is empty, may report false negatives when the last remove is underway - */ - def isEmpty: Boolean = container.isEmpty - - /** - * Removes all keys and all values - */ - def clear = foreach { case (k, v) ⇒ remove(k, v) } -} diff --git a/akka-actor/src/main/scala/akka/util/Index.scala b/akka-actor/src/main/scala/akka/util/Index.scala new file mode 100644 index 0000000000..d7df32efd6 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/Index.scala @@ -0,0 +1,124 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.util + +import annotation.tailrec + +import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap } +import java.util.{ Set ⇒ JSet } + +/** + * An implementation of a ConcurrentMultiMap + * Adds/remove is serialized over the specified key + * Reads are fully concurrent <-- el-cheapo + * + * @author Viktor Klang + */ +class Index[K <: AnyRef, V <: AnyRef: Manifest] { + private val Naught = Array[V]() //Nil for Arrays + private val container = new ConcurrentHashMap[K, JSet[V]] + private val emptySet = new ConcurrentSkipListSet[V] + + /** + * Associates the value of type V with the key of type K + * @return true if the value didn't exist for the key previously, and false otherwise + */ + def put(key: K, value: V): Boolean = { + //Tailrecursive spin-locking put + @tailrec + def spinPut(k: K, v: V): Boolean = { + var retry = false + var added = false + val set = container get k + + if (set ne null) { + set.synchronized { + if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry + else { //Else add the value to the set and signal that retry is not needed + added = set add v + retry = false + } + } + } else { + val newSet = new ConcurrentSkipListSet[V] + newSet add v + + // Parry for two simultaneous putIfAbsent(id,newSet) + val oldSet = container.putIfAbsent(k, newSet) + if (oldSet ne null) { + oldSet.synchronized { + if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry + else { //Else try to add the value to the set and signal that retry is not needed + added = oldSet add v + retry = false + } + } + } else added = true + } + + if (retry) spinPut(k, v) + else added + } + + spinPut(key, value) + } + + /** + * @return a _new_ array of all existing values for the given key at the time of the call + */ + def values(key: K): Array[V] = { + val set: JSet[V] = container get key + val result = if (set ne null) set toArray Naught else Naught + result.asInstanceOf[Array[V]] + } + + /** + * @return Some(value) for the first matching value where the supplied function returns true for the given key, + * if no matches it returns None + */ + def findValue(key: K)(f: (V) ⇒ Boolean): Option[V] = { + import scala.collection.JavaConversions._ + val set = container get key + if (set ne null) set.iterator.find(f) + else None + } + + /** + * Applies the supplied function to all keys and their values + */ + def foreach(fun: (K, V) ⇒ Unit) { + import scala.collection.JavaConversions._ + container.entrySet foreach { e ⇒ e.getValue.foreach(fun(e.getKey, _)) } + } + + /** + * Disassociates the value of type V from the key of type K + * @return true if the value was disassociated from the key and false if it wasn't previously associated with the key + */ + def remove(key: K, value: V): Boolean = { + val set = container get key + + if (set ne null) { + set.synchronized { + if (set.remove(value)) { //If we can remove the value + if (set.isEmpty) //and the set becomes empty + container.remove(key, emptySet) //We try to remove the key if it's mapped to an empty set + + true //Remove succeeded + } else false //Remove failed + } + } else false //Remove failed + } + + /** + * @return true if the underlying containers is empty, may report false negatives when the last remove is underway + */ + def isEmpty: Boolean = container.isEmpty + + /** + * Removes all keys and all values + */ + def clear = foreach { case (k, v) ⇒ remove(k, v) } +} \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index b35a1221fe..3dc59daec0 100644 --- a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -12,7 +12,6 @@ import akka.serialization.RemoteActorSerialization._ import akka.remoteinterface._ import akka.actor.{ PoisonPill, - Index, LocalActorRef, Actor, RemoteActorRef,