+cdd #16799 Add ORMultiMap data type

This commit is contained in:
Christopher Hunt 2015-06-18 16:17:53 +02:00 committed by Patrik Nordwall
parent cbe5dd2cf5
commit 7041c76ba9
12 changed files with 2083 additions and 50 deletions

View file

@ -86,7 +86,15 @@ message PNCounterMap {
repeated Entry entries = 2;
}
message ORMultiMap {
message Entry {
required string key = 1;
required ORSet value = 2;
}
required ORSet keys = 1;
repeated Entry entries = 2;
}

View file

@ -87,7 +87,7 @@ final class ORMap[A <: ReplicatedData] private[akka] (
*
* `IllegalArgumentException` is thrown if you try to replace an existing `ORSet`
* value, because important history can be lost when replacing the `ORSet` and
* undesired effects of merging will occur.
* undesired effects of merging will occur. Use [[ORMultiMap]] or [[#updated]] instead.
*/
def put(node: Cluster, key: String, value: A): ORMap[A] = put(node.selfUniqueAddress, key, value)
@ -99,7 +99,7 @@ final class ORMap[A <: ReplicatedData] private[akka] (
throw new IllegalArgumentException(
"`ORMap.put` must not be used to replace an existing `ORSet` " +
"value, because important history can be lost when replacing the `ORSet` and " +
"undesired effects of merging will occur. Use `ORMap.updated` instead.")
"undesired effects of merging will occur. Use `ORMultiMap` or `ORMap.updated` instead.")
else
new ORMap(keys.add(node, key), values.updated(key, value))

View file

@ -0,0 +1,233 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.ddata
import akka.cluster.{ UniqueAddress, Cluster }
object ORMultiMap {
val _empty: ORMultiMap[Any] = new ORMultiMap(ORMap.empty)
/**
* Provides an empty multimap.
*/
def empty[A]: ORMultiMap[A] = _empty.asInstanceOf[ORMultiMap[A]]
def apply(): ORMultiMap[Any] = _empty
/**
* Java API
*/
def create[A](): ORMultiMap[A] = empty[A]
/**
* Extract the [[ORMultiMap#entries]].
*/
def unapply[A](m: ORMultiMap[A]): Option[Map[String, Set[A]]] = Some(m.entries)
/**
* Extract the [[ORMultiMap#entries]] of an `ORMultiMap`.
*/
def unapply(value: Any): Option[Map[String, Set[Any]]] = value match {
case m: ORMultiMap[Any] @unchecked Some(m.entries)
case _ None
}
}
/**
* An immutable multi-map implementation. This class wraps an
* [[ORMap]] with an [[ORSet]] for the map's value.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
@SerialVersionUID(1L)
final class ORMultiMap[A] private[akka] (private[akka] val underlying: ORMap[ORSet[A]])
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
override type T = ORMultiMap[A]
override def merge(that: T): T =
new ORMultiMap(underlying.merge(that.underlying))
/**
* Scala API: All entries of a multimap where keys are strings and values are sets.
*/
def entries: Map[String, Set[A]] =
underlying.entries.map { case (k, v) k -> v.elements }
/**
* Java API: All entries of a multimap where keys are strings and values are sets.
*/
def getEntries(): java.util.Map[String, java.util.Set[A]] = {
import scala.collection.JavaConverters._
val result = new java.util.HashMap[String, java.util.Set[A]]
underlying.entries.foreach {
case (k, v) result.put(k, v.elements.asJava)
}
result
}
/**
* Get the set associated with the key if there is one.
*/
def get(key: String): Option[Set[A]] =
underlying.get(key).map(_.elements)
/**
* Scala API: Get the set associated with the key if there is one,
* else return the given default.
*/
def getOrElse(key: String, default: Set[A]): Set[A] =
get(key).getOrElse(default)
def contains(key: String): Boolean = underlying.contains(key)
def isEmpty: Boolean = underlying.isEmpty
def size: Int = underlying.size
/**
* Convenience for put. Requires an implicit Cluster.
* @see [[#put]]
*/
def +(entry: (String, Set[A]))(implicit node: Cluster): ORMultiMap[A] = {
val (key, value) = entry
put(node, key, value)
}
/**
* Scala API: Associate an entire set with the key while retaining the history of the previous
* replicated data set.
*/
def put(node: Cluster, key: String, value: Set[A]): ORMultiMap[A] =
put(node.selfUniqueAddress, key, value)
/**
* Java API: Associate an entire set with the key while retaining the history of the previous
* replicated data set.
*/
def put(node: Cluster, key: String, value: java.util.Set[A]): ORMultiMap[A] = {
import scala.collection.JavaConverters._
put(node, key, value.asScala.toSet)
}
/**
* INTERNAL API
*/
private[akka] def put(node: UniqueAddress, key: String, value: Set[A]): ORMultiMap[A] = {
val newUnderlying = underlying.updated(node, key, ORSet.empty[A]) { existing
value.foldLeft(existing.clear(node)) { (s, element) s.add(node, element) }
}
new ORMultiMap(newUnderlying)
}
/**
* Convenience for remove. Requires an implicit Cluster.
* @see [[#remove]]
*/
def -(key: String)(implicit node: Cluster): ORMultiMap[A] =
remove(node, key)
/**
* Remove an entire set associated with the key.
*/
def remove(node: Cluster, key: String): ORMultiMap[A] =
remove(node.selfUniqueAddress, key)
/**
* INTERNAL API
*/
private[akka] def remove(node: UniqueAddress, key: String): ORMultiMap[A] =
new ORMultiMap(underlying.remove(node, key))
/**
* Scala API: Add an element to a set associated with a key. If there is no existing set then one will be initialised.
*/
def addBinding(key: String, element: A)(implicit node: Cluster): ORMultiMap[A] =
addBinding(node.selfUniqueAddress, key, element)
/**
* Java API: Add an element to a set associated with a key. If there is no existing set then one will be initialised.
*/
def addBinding(node: Cluster, key: String, element: A): ORMultiMap[A] =
addBinding(key, element)(node)
/**
* INTERNAL API
*/
private[akka] def addBinding(node: UniqueAddress, key: String, element: A): ORMultiMap[A] = {
val newUnderlying = underlying.updated(node, key, ORSet.empty[A])(_.add(node, element))
new ORMultiMap(newUnderlying)
}
/**
* Scala API: Remove an element of a set associated with a key. If there are no more elements in the set then the
* entire set will be removed.
*/
def removeBinding(key: String, element: A)(implicit node: Cluster): ORMultiMap[A] =
removeBinding(node.selfUniqueAddress, key, element)
/**
* Java API: Remove an element of a set associated with a key. If there are no more elements in the set then the
* entire set will be removed.
*/
def removeBinding(node: Cluster, key: String, element: A): ORMultiMap[A] =
removeBinding(key, element)(node)
/**
* INTERNAL API
*/
private[akka] def removeBinding(node: UniqueAddress, key: String, element: A): ORMultiMap[A] = {
val newUnderlying = {
val u = underlying.updated(node, key, ORSet.empty[A])(_.remove(node, element))
u.get(key) match {
case Some(s) if s.isEmpty u.remove(node, key)
case _ u
}
}
new ORMultiMap(newUnderlying)
}
/**
* Replace an element of a set associated with a key with a new one if it is different. This is useful when an element is removed
* and another one is added within the same Update. The order of addition and removal is important in order
* to retain history for replicated data.
*/
def replaceBinding(key: String, oldElement: A, newElement: A)(implicit node: Cluster): ORMultiMap[A] =
replaceBinding(node.selfUniqueAddress, key, oldElement, newElement)
/**
* INTERNAL API
*/
private[akka] def replaceBinding(node: UniqueAddress, key: String, oldElement: A, newElement: A): ORMultiMap[A] =
if (newElement != oldElement)
addBinding(node, key, newElement).removeBinding(node, key, oldElement)
else
this
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
underlying.needPruningFrom(removedNode)
override def pruningCleanup(removedNode: UniqueAddress): T =
new ORMultiMap(underlying.pruningCleanup(removedNode))
override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): T =
new ORMultiMap(underlying.prune(removedNode, collapseInto))
// this class cannot be a `case class` because we need different `unapply`
override def toString: String = s"ORMulti$entries"
override def equals(o: Any): Boolean = o match {
case other: ORMultiMap[_] underlying == other.underlying
case _ false
}
override def hashCode: Int = underlying.hashCode
}
object ORMultiMapKey {
def create[A](id: String): Key[ORMultiMap[A]] = ORMultiMapKey(id)
}
@SerialVersionUID(1L)
final case class ORMultiMapKey[A](_id: String) extends Key[ORMultiMap[A]](_id) with ReplicatedDataSerialization

View file

@ -568,7 +568,7 @@ object Replicator {
* <li>Counters: [[GCounter]], [[PNCounter]]</li>
* <li>Registers: [[LWWRegister]], [[Flag]]</li>
* <li>Sets: [[GSet]], [[ORSet]]</li>
* <li>Maps: [[ORMap]], [[LWWMap]], [[PNCounterMap]]</li>
* <li>Maps: [[ORMap]], [[ORMultiMap]], [[LWWMap]], [[PNCounterMap]]</li>
* </ul>
*
* For good introduction to the CRDT subject watch the

View file

@ -46,6 +46,8 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
private val LWWMapKeyManifest = "i"
private val PNCounterMapManifest = "J"
private val PNCounterMapKeyManifest = "j"
private val ORMultiMapManifest = "K"
private val ORMultiMapKeyManifest = "k"
private val VersionVectorManifest = "L"
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] AnyRef](
@ -58,6 +60,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
ORMapManifest -> ormapFromBinary,
LWWMapManifest -> lwwmapFromBinary,
PNCounterMapManifest -> pncountermapFromBinary,
ORMultiMapManifest -> multimapFromBinary,
DeletedDataManifest -> (_ DeletedData),
VersionVectorManifest -> versionVectorFromBinary,
@ -69,7 +72,8 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
PNCounterKeyManifest -> (bytes PNCounterKey(keyIdFromBinary(bytes))),
ORMapKeyManifest -> (bytes ORMapKey(keyIdFromBinary(bytes))),
LWWMapKeyManifest -> (bytes LWWMapKey(keyIdFromBinary(bytes))),
PNCounterMapKeyManifest -> (bytes PNCounterMapKey(keyIdFromBinary(bytes))))
PNCounterMapKeyManifest -> (bytes PNCounterMapKey(keyIdFromBinary(bytes))),
ORMultiMapKeyManifest -> (bytes ORMultiMapKey(keyIdFromBinary(bytes))))
override def manifest(obj: AnyRef): String = obj match {
case _: ORSet[_] ORSetManifest
@ -81,6 +85,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
case _: ORMap[_] ORMapManifest
case _: LWWMap[_] LWWMapManifest
case _: PNCounterMap PNCounterMapManifest
case _: ORMultiMap[_] ORMultiMapManifest
case DeletedData DeletedDataManifest
case _: VersionVector VersionVectorManifest
@ -93,6 +98,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
case _: ORMapKey[_] ORMapKeyManifest
case _: LWWMapKey[_] LWWMapKeyManifest
case _: PNCounterMapKey PNCounterMapKeyManifest
case _: ORMultiMapKey[_] ORMultiMapKeyManifest
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
@ -108,6 +114,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
case m: ORMap[_] compress(ormapToProto(m))
case m: LWWMap[_] compress(lwwmapToProto(m))
case m: PNCounterMap compress(pncountermapToProto(m))
case m: ORMultiMap[_] compress(multimapToProto(m))
case DeletedData dm.Empty.getDefaultInstance.toByteArray
case m: VersionVector versionVectorToProto(m).toByteArray
case Key(id) keyIdToBinary(id)
@ -371,6 +378,26 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
entries))
}
def multimapToProto(multimap: ORMultiMap[_]): rd.ORMultiMap = {
val b = rd.ORMultiMap.newBuilder().setKeys(orsetToProto(multimap.underlying.keys))
multimap.underlying.entries.toVector.sortBy { case (key, _) key }.foreach {
case (key, value) b.addEntries(rd.ORMultiMap.Entry.newBuilder().
setKey(key).setValue(orsetToProto(value)))
}
b.build()
}
def multimapFromBinary(bytes: Array[Byte]): ORMultiMap[Any] =
multimapFromProto(rd.ORMultiMap.parseFrom(decompress(bytes)))
def multimapFromProto(multimap: rd.ORMultiMap): ORMultiMap[Any] = {
val entries = multimap.getEntriesList.asScala.map(entry
entry.getKey -> orsetFromProto(entry.getValue)).toMap
new ORMultiMap(new ORMap(
keys = orsetFromProto(multimap.getKeys).asInstanceOf[ORSet[String]],
entries))
}
def keyIdToBinary(id: String): Array[Byte] =
id.getBytes(UTF_8)

View file

@ -0,0 +1,122 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.ddata
import akka.actor.Address
import akka.cluster.UniqueAddress
import akka.cluster.ddata.Replicator.Changed
import org.scalatest.{ Matchers, WordSpec }
class ORMultiMapSpec extends WordSpec with Matchers {
val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1)
val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2)
"A ORMultiMap" must {
"be able to add entries" in {
val m = ORMultiMap().addBinding(node1, "a", "A").addBinding(node1, "b", "B")
m.entries should be(Map("a" -> Set("A"), "b" -> Set("B")))
val m2 = m.addBinding(node1, "a", "C")
m2.entries should be(Map("a" -> Set("A", "C"), "b" -> Set("B")))
}
"be able to remove entry" in {
val m = ORMultiMap().addBinding(node1, "a", "A").addBinding(node1, "b", "B").removeBinding(node1, "a", "A")
m.entries should be(Map("b" -> Set("B")))
}
"be able to replace an entry" in {
val m = ORMultiMap().addBinding(node1, "a", "A").replaceBinding(node1, "a", "A", "B")
m.entries should be(Map("a" -> Set("B")))
}
"be able to have its entries correctly merged with another ORMultiMap with other entries" in {
val m1 = ORMultiMap().addBinding(node1, "a", "A").addBinding(node1, "b", "B")
val m2 = ORMultiMap().addBinding(node2, "c", "C")
// merge both ways
val expectedMerge = Map(
"a" -> Set("A"),
"b" -> Set("B"),
"c" -> Set("C"))
val merged1 = m1 merge m2
merged1.entries should be(expectedMerge)
val merged2 = m2 merge m1
merged2.entries should be(expectedMerge)
}
"be able to have its entries correctly merged with another ORMultiMap with overlapping entries" in {
val m1 = ORMultiMap()
.addBinding(node1, "a", "A1")
.addBinding(node1, "b", "B1")
.removeBinding(node1, "a", "A1")
.addBinding(node1, "d", "D1")
val m2 = ORMultiMap()
.addBinding(node2, "c", "C2")
.addBinding(node2, "a", "A2")
.addBinding(node2, "b", "B2")
.removeBinding(node2, "b", "B2")
.addBinding(node2, "d", "D2")
// merge both ways
val expectedMerged = Map(
"a" -> Set("A2"),
"b" -> Set("B1"),
"c" -> Set("C2"),
"d" -> Set("D1", "D2"))
val merged1 = m1 merge m2
merged1.entries should be(expectedMerged)
val merged2 = m2 merge m1
merged2.entries should be(expectedMerged)
}
}
"be able to get all bindings for an entry and then reduce them upon putting them back" in {
val m = ORMultiMap().addBinding(node1, "a", "A1").addBinding(node1, "a", "A2").addBinding(node1, "b", "B1")
val Some(a) = m.get("a")
a should be(Set("A1", "A2"))
val m2 = m.put(node1, "a", a - "A1")
val expectedMerged = Map(
"a" -> Set("A2"),
"b" -> Set("B1"))
m2.entries should be(expectedMerged)
}
"return the value for an existing key and the default for a non-existing one when using getOrElse" in {
val m = ORMultiMap().addBinding(node1, "a", "A")
m.getOrElse("a", Set("B")) shouldBe Set("A")
m.getOrElse("b", Set("B")) shouldBe Set("B")
}
"remove all bindings for a given key" in {
val m = ORMultiMap().addBinding(node1, "a", "A1").addBinding(node1, "a", "A2").addBinding(node1, "b", "B1")
val m2 = m.remove(node1, "a")
m2.entries should be(Map("b" -> Set("B1")))
}
"have unapply extractor" in {
val m1 = ORMultiMap.empty.put(node1, "a", Set(1L, 2L)).put(node2, "b", Set(3L))
val m2: ORMultiMap[Long] = m1
val ORMultiMap(entries1) = m1
val entries2: Map[String, Set[Long]] = entries1
Changed(ORMultiMapKey[Long]("key"))(m1) match {
case c @ Changed(ORMultiMapKey("key"))
val ORMultiMap(entries3) = c.dataValue
val entries4: Map[String, Set[Long]] = entries3
entries4 should be(Map("a" -> Set(1L, 2L), "b" -> Set(3L)))
}
}
}

View file

@ -16,6 +16,7 @@ import akka.cluster.ddata.GSet
import akka.cluster.ddata.LWWMap
import akka.cluster.ddata.LWWRegister
import akka.cluster.ddata.ORMap
import akka.cluster.ddata.ORMultiMap
import akka.cluster.ddata.ORSet
import akka.cluster.ddata.PNCounter
import akka.cluster.ddata.PNCounterMap
@ -154,6 +155,19 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem("ReplicatedDataSe
increment(address2, "b", 5))
}
"serialize ORMultiMap" in {
checkSerialization(ORMultiMap())
checkSerialization(ORMultiMap().addBinding(address1, "a", "A"))
checkSerialization(ORMultiMap.empty[String]
.addBinding(address1, "a", "A1")
.put(address2, "b", Set("B1", "B2", "B3"))
.addBinding(address2, "a", "A2"))
val m1 = ORMultiMap.empty[String].addBinding(address1, "a", "A1").addBinding(address2, "a", "A2")
val m2 = ORMultiMap.empty[String].put(address2, "b", Set("B1", "B2", "B3"))
checkSameContent(m1.merge(m2), m2.merge(m1))
}
"serialize DeletedData" in {
checkSerialization(DeletedData)
}