2015-06-18 16:17:53 +02:00
|
|
|
/**
|
2017-01-04 17:37:10 +01:00
|
|
|
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
2015-06-18 16:17:53 +02:00
|
|
|
*/
|
|
|
|
|
package akka.cluster.ddata
|
|
|
|
|
|
2017-02-23 01:20:33 +01:00
|
|
|
import akka.cluster.{ Cluster, UniqueAddress }
|
2017-02-07 11:21:56 +01:00
|
|
|
import akka.annotation.InternalApi
|
2017-02-23 01:20:33 +01:00
|
|
|
import akka.cluster.ddata.ORMap._
|
2015-06-18 16:17:53 +02:00
|
|
|
|
|
|
|
|
object ORMultiMap {
|
2017-03-24 16:18:01 +01:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
@InternalApi private[akka] case object ORMultiMapTag extends ZeroTag {
|
|
|
|
|
override def zero: DeltaReplicatedData = ORMultiMap.empty
|
|
|
|
|
override final val value: Int = 2
|
|
|
|
|
}
|
2015-06-18 16:17:53 +02:00
|
|
|
|
2017-03-24 16:18:01 +01:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
@InternalApi private[akka] case object ORMultiMapWithValueDeltasTag extends ZeroTag {
|
|
|
|
|
override def zero: DeltaReplicatedData = ORMultiMap.emptyWithValueDeltas
|
|
|
|
|
override final val value: Int = 3
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val _empty: ORMultiMap[Any, Any] = new ORMultiMap(new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapTag), false)
|
|
|
|
|
val _emptyWithValueDeltas: ORMultiMap[Any, Any] = new ORMultiMap(new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapWithValueDeltasTag), true)
|
2015-06-18 16:17:53 +02:00
|
|
|
/**
|
|
|
|
|
* Provides an empty multimap.
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def empty[A, B]: ORMultiMap[A, B] = _empty.asInstanceOf[ORMultiMap[A, B]]
|
2017-02-23 01:20:33 +01:00
|
|
|
def emptyWithValueDeltas[A, B]: ORMultiMap[A, B] = _emptyWithValueDeltas.asInstanceOf[ORMultiMap[A, B]]
|
2016-12-22 11:47:27 +01:00
|
|
|
def apply(): ORMultiMap[Any, Any] = _empty
|
2015-06-18 16:17:53 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def create[A, B](): ORMultiMap[A, B] = empty[A, B]
|
2015-06-18 16:17:53 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Extract the [[ORMultiMap#entries]].
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def unapply[A, B](m: ORMultiMap[A, B]): Option[Map[A, Set[B]]] = Some(m.entries)
|
2015-06-18 16:17:53 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Extract the [[ORMultiMap#entries]] of an `ORMultiMap`.
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def unapply[A, B <: ReplicatedData](value: Any): Option[Map[A, Set[B]]] = value match {
|
|
|
|
|
case m: ORMultiMap[A, B] @unchecked ⇒ Some(m.entries)
|
|
|
|
|
case _ ⇒ None
|
2015-06-18 16:17:53 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* An immutable multi-map implementation. This class wraps an
|
|
|
|
|
* [[ORMap]] with an [[ORSet]] for the map's value.
|
|
|
|
|
*
|
|
|
|
|
* This class is immutable, i.e. "modifying" methods return a new instance.
|
|
|
|
|
*/
|
|
|
|
|
@SerialVersionUID(1L)
|
2017-02-23 01:20:33 +01:00
|
|
|
final class ORMultiMap[A, B] private[akka] (
|
|
|
|
|
private[akka] val underlying: ORMap[A, ORSet[B]],
|
|
|
|
|
private[akka] val withValueDeltas: Boolean)
|
|
|
|
|
extends DeltaReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
|
2015-06-18 16:17:53 +02:00
|
|
|
|
2016-12-22 11:47:27 +01:00
|
|
|
override type T = ORMultiMap[A, B]
|
2017-02-23 01:20:33 +01:00
|
|
|
override type D = ORMap.DeltaOp
|
2015-06-18 16:17:53 +02:00
|
|
|
|
|
|
|
|
override def merge(that: T): T =
|
2017-02-23 01:20:33 +01:00
|
|
|
if (withValueDeltas == that.withValueDeltas) {
|
|
|
|
|
if (withValueDeltas)
|
|
|
|
|
new ORMultiMap(underlying.mergeRetainingDeletedValues(that.underlying), withValueDeltas)
|
|
|
|
|
else
|
|
|
|
|
new ORMultiMap(underlying.merge(that.underlying), withValueDeltas)
|
|
|
|
|
} else throw new IllegalArgumentException("Trying to merge two ORMultiMaps of different map sub-type")
|
2015-06-18 16:17:53 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Scala API: All entries of a multimap where keys are strings and values are sets.
|
|
|
|
|
*/
|
2017-02-23 01:20:33 +01:00
|
|
|
def entries: Map[A, Set[B]] = if (withValueDeltas)
|
|
|
|
|
underlying.entries.collect { case (k, v) if underlying.keys.elements.contains(k) ⇒ k → v.elements }
|
|
|
|
|
else
|
2016-06-02 14:06:57 +02:00
|
|
|
underlying.entries.map { case (k, v) ⇒ k → v.elements }
|
2015-06-18 16:17:53 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API: All entries of a multimap where keys are strings and values are sets.
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def getEntries(): java.util.Map[A, java.util.Set[B]] = {
|
2015-06-18 16:17:53 +02:00
|
|
|
import scala.collection.JavaConverters._
|
2016-12-22 11:47:27 +01:00
|
|
|
val result = new java.util.HashMap[A, java.util.Set[B]]
|
2017-02-23 01:20:33 +01:00
|
|
|
if (withValueDeltas)
|
|
|
|
|
underlying.entries.foreach { case (k, v) ⇒ if (underlying.keys.elements.contains(k)) result.put(k, v.elements.asJava) }
|
|
|
|
|
else
|
|
|
|
|
underlying.entries.foreach { case (k, v) ⇒ result.put(k, v.elements.asJava) }
|
2015-06-18 16:17:53 +02:00
|
|
|
result
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the set associated with the key if there is one.
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def get(key: A): Option[Set[B]] =
|
2017-02-23 01:20:33 +01:00
|
|
|
if (withValueDeltas && !underlying.keys.elements.contains(key))
|
|
|
|
|
None
|
|
|
|
|
else
|
|
|
|
|
underlying.get(key).map(_.elements)
|
2015-06-18 16:17:53 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Scala API: Get the set associated with the key if there is one,
|
|
|
|
|
* else return the given default.
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def getOrElse(key: A, default: ⇒ Set[B]): Set[B] =
|
2015-06-18 16:17:53 +02:00
|
|
|
get(key).getOrElse(default)
|
|
|
|
|
|
2017-02-23 01:20:33 +01:00
|
|
|
def contains(key: A): Boolean = underlying.keys.elements.contains(key)
|
2015-06-18 16:17:53 +02:00
|
|
|
|
2017-02-23 01:20:33 +01:00
|
|
|
def isEmpty: Boolean = underlying.keys.elements.isEmpty
|
2015-06-18 16:17:53 +02:00
|
|
|
|
2017-02-23 01:20:33 +01:00
|
|
|
def size: Int = underlying.keys.elements.size
|
2015-06-18 16:17:53 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Convenience for put. Requires an implicit Cluster.
|
|
|
|
|
* @see [[#put]]
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def +(entry: (A, Set[B]))(implicit node: Cluster): ORMultiMap[A, B] = {
|
2015-06-18 16:17:53 +02:00
|
|
|
val (key, value) = entry
|
|
|
|
|
put(node, key, value)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Scala API: Associate an entire set with the key while retaining the history of the previous
|
|
|
|
|
* replicated data set.
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def put(node: Cluster, key: A, value: Set[B]): ORMultiMap[A, B] =
|
2015-06-18 16:17:53 +02:00
|
|
|
put(node.selfUniqueAddress, key, value)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API: Associate an entire set with the key while retaining the history of the previous
|
|
|
|
|
* replicated data set.
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def put(node: Cluster, key: A, value: java.util.Set[B]): ORMultiMap[A, B] = {
|
2015-06-18 16:17:53 +02:00
|
|
|
import scala.collection.JavaConverters._
|
|
|
|
|
put(node, key, value.asScala.toSet)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-02-07 11:21:56 +01:00
|
|
|
@InternalApi private[akka] def put(node: UniqueAddress, key: A, value: Set[B]): ORMultiMap[A, B] = {
|
2017-02-23 01:20:33 +01:00
|
|
|
val newUnderlying = underlying.updated(node, key, ORSet.empty[B], valueDeltas = withValueDeltas) { existing ⇒
|
2015-06-18 16:17:53 +02:00
|
|
|
value.foldLeft(existing.clear(node)) { (s, element) ⇒ s.add(node, element) }
|
|
|
|
|
}
|
2017-02-23 01:20:33 +01:00
|
|
|
new ORMultiMap(newUnderlying, withValueDeltas)
|
2015-06-18 16:17:53 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Convenience for remove. Requires an implicit Cluster.
|
|
|
|
|
* @see [[#remove]]
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def -(key: A)(implicit node: Cluster): ORMultiMap[A, B] =
|
2015-06-18 16:17:53 +02:00
|
|
|
remove(node, key)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Remove an entire set associated with the key.
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def remove(node: Cluster, key: A): ORMultiMap[A, B] =
|
2015-06-18 16:17:53 +02:00
|
|
|
remove(node.selfUniqueAddress, key)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-02-23 01:20:33 +01:00
|
|
|
@InternalApi private[akka] def remove(node: UniqueAddress, key: A): ORMultiMap[A, B] = {
|
|
|
|
|
if (withValueDeltas) {
|
|
|
|
|
val u = underlying.updated(node, key, ORSet.empty[B], valueDeltas = true) { existing ⇒ existing.clear(node) }
|
|
|
|
|
new ORMultiMap(u.removeKey(node, key), withValueDeltas)
|
|
|
|
|
} else {
|
|
|
|
|
new ORMultiMap(underlying.remove(node, key), withValueDeltas)
|
|
|
|
|
}
|
|
|
|
|
}
|
2015-06-18 16:17:53 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Scala API: Add an element to a set associated with a key. If there is no existing set then one will be initialised.
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def addBinding(key: A, element: B)(implicit node: Cluster): ORMultiMap[A, B] =
|
2015-06-18 16:17:53 +02:00
|
|
|
addBinding(node.selfUniqueAddress, key, element)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API: Add an element to a set associated with a key. If there is no existing set then one will be initialised.
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def addBinding(node: Cluster, key: A, element: B): ORMultiMap[A, B] =
|
2015-06-18 16:17:53 +02:00
|
|
|
addBinding(key, element)(node)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-02-07 11:21:56 +01:00
|
|
|
@InternalApi private[akka] def addBinding(node: UniqueAddress, key: A, element: B): ORMultiMap[A, B] = {
|
2017-02-23 01:20:33 +01:00
|
|
|
val newUnderlying = underlying.updated(node, key, ORSet.empty[B], valueDeltas = withValueDeltas)(_.add(node, element))
|
|
|
|
|
new ORMultiMap(newUnderlying, withValueDeltas)
|
2015-06-18 16:17:53 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Scala API: Remove an element of a set associated with a key. If there are no more elements in the set then the
|
|
|
|
|
* entire set will be removed.
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def removeBinding(key: A, element: B)(implicit node: Cluster): ORMultiMap[A, B] =
|
2015-06-18 16:17:53 +02:00
|
|
|
removeBinding(node.selfUniqueAddress, key, element)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API: Remove an element of a set associated with a key. If there are no more elements in the set then the
|
|
|
|
|
* entire set will be removed.
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def removeBinding(node: Cluster, key: A, element: B): ORMultiMap[A, B] =
|
2015-06-18 16:17:53 +02:00
|
|
|
removeBinding(key, element)(node)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-02-07 11:21:56 +01:00
|
|
|
@InternalApi private[akka] def removeBinding(node: UniqueAddress, key: A, element: B): ORMultiMap[A, B] = {
|
2015-06-18 16:17:53 +02:00
|
|
|
val newUnderlying = {
|
2017-02-23 01:20:33 +01:00
|
|
|
val u = underlying.updated(node, key, ORSet.empty[B], valueDeltas = withValueDeltas)(_.remove(node, element))
|
2015-06-18 16:17:53 +02:00
|
|
|
u.get(key) match {
|
2017-02-23 01:20:33 +01:00
|
|
|
case Some(s) if s.isEmpty ⇒
|
|
|
|
|
if (withValueDeltas)
|
|
|
|
|
u.removeKey(node, key)
|
|
|
|
|
else
|
|
|
|
|
u.remove(node, key)
|
|
|
|
|
case _ ⇒ u
|
2015-06-18 16:17:53 +02:00
|
|
|
}
|
|
|
|
|
}
|
2017-02-23 01:20:33 +01:00
|
|
|
new ORMultiMap(newUnderlying, withValueDeltas)
|
2015-06-18 16:17:53 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Replace an element of a set associated with a key with a new one if it is different. This is useful when an element is removed
|
|
|
|
|
* and another one is added within the same Update. The order of addition and removal is important in order
|
|
|
|
|
* to retain history for replicated data.
|
|
|
|
|
*/
|
2016-12-22 11:47:27 +01:00
|
|
|
def replaceBinding(key: A, oldElement: B, newElement: B)(implicit node: Cluster): ORMultiMap[A, B] =
|
2015-06-18 16:17:53 +02:00
|
|
|
replaceBinding(node.selfUniqueAddress, key, oldElement, newElement)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-02-07 11:21:56 +01:00
|
|
|
@InternalApi private[akka] def replaceBinding(node: UniqueAddress, key: A, oldElement: B, newElement: B): ORMultiMap[A, B] =
|
2015-06-18 16:17:53 +02:00
|
|
|
if (newElement != oldElement)
|
|
|
|
|
addBinding(node, key, newElement).removeBinding(node, key, oldElement)
|
|
|
|
|
else
|
|
|
|
|
this
|
|
|
|
|
|
2017-02-23 01:20:33 +01:00
|
|
|
override def resetDelta: ORMultiMap[A, B] =
|
|
|
|
|
new ORMultiMap(underlying.resetDelta, withValueDeltas)
|
|
|
|
|
|
|
|
|
|
override def delta: Option[D] = underlying.delta
|
|
|
|
|
|
|
|
|
|
override def mergeDelta(thatDelta: D): ORMultiMap[A, B] =
|
|
|
|
|
new ORMultiMap(underlying.mergeDelta(thatDelta), withValueDeltas)
|
|
|
|
|
|
2017-01-11 13:19:45 +01:00
|
|
|
override def modifiedByNodes: Set[UniqueAddress] =
|
|
|
|
|
underlying.modifiedByNodes
|
|
|
|
|
|
2015-06-18 16:17:53 +02:00
|
|
|
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
|
|
|
|
|
underlying.needPruningFrom(removedNode)
|
|
|
|
|
|
|
|
|
|
override def pruningCleanup(removedNode: UniqueAddress): T =
|
2017-02-23 01:20:33 +01:00
|
|
|
new ORMultiMap(underlying.pruningCleanup(removedNode), withValueDeltas)
|
2015-06-18 16:17:53 +02:00
|
|
|
|
|
|
|
|
override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): T =
|
2017-02-23 01:20:33 +01:00
|
|
|
new ORMultiMap(underlying.prune(removedNode, collapseInto), withValueDeltas)
|
2015-06-18 16:17:53 +02:00
|
|
|
|
|
|
|
|
// this class cannot be a `case class` because we need different `unapply`
|
|
|
|
|
|
|
|
|
|
override def toString: String = s"ORMulti$entries"
|
|
|
|
|
|
|
|
|
|
override def equals(o: Any): Boolean = o match {
|
2016-12-22 11:47:27 +01:00
|
|
|
case other: ORMultiMap[_, _] ⇒ underlying == other.underlying
|
|
|
|
|
case _ ⇒ false
|
2015-06-18 16:17:53 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def hashCode: Int = underlying.hashCode
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
object ORMultiMapKey {
|
2016-12-22 11:47:27 +01:00
|
|
|
def create[A, B](id: String): Key[ORMultiMap[A, B]] = ORMultiMapKey(id)
|
2015-06-18 16:17:53 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SerialVersionUID(1L)
|
2016-12-22 11:47:27 +01:00
|
|
|
final case class ORMultiMapKey[A, B](_id: String) extends Key[ORMultiMap[A, B]](_id) with ReplicatedDataSerialization
|