Typed Distributed Data requires untyped Cluster #25746 (#26074)

Typed Distributed Data requires untyped Cluster [#25746](https://github.com/akka/akka/issues/25746)
This commit is contained in:
Helena Edelson 2018-12-14 15:53:08 -05:00 committed by GitHub
parent 2c145cd3c3
commit 8a44fca087
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
44 changed files with 656 additions and 368 deletions

View file

@ -10,7 +10,7 @@ import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.cluster.Cluster
import akka.cluster.{ Cluster, UniqueAddress }
object DistributedData extends ExtensionId[DistributedData] with ExtensionIdProvider {
override def get(system: ActorSystem): DistributedData = super.get(system)
@ -28,14 +28,9 @@ object DistributedData extends ExtensionId[DistributedData] with ExtensionIdProv
*/
class DistributedData(system: ExtendedActorSystem) extends Extension {
private val config = system.settings.config.getConfig("akka.cluster.distributed-data")
private val settings = ReplicatorSettings(config)
private val settings = ReplicatorSettings(system)
/**
* Returns true if this member is not tagged with the role configured for the
* replicas.
*/
def isTerminated: Boolean = Cluster(system).isTerminated || !settings.roles.subsetOf(Cluster(system).selfRoles)
implicit val selfUniqueAddress: SelfUniqueAddress = SelfUniqueAddress(Cluster(system).selfUniqueAddress)
/**
* `ActorRef` of the [[Replicator]] .
@ -45,7 +40,20 @@ class DistributedData(system: ExtendedActorSystem) extends Extension {
system.log.warning("Replicator points to dead letters: Make sure the cluster node is not terminated and has the proper role!")
system.deadLetters
} else {
val name = config.getString("name")
system.systemActorOf(Replicator.props(settings), name)
system.systemActorOf(Replicator.props(settings), ReplicatorSettings.name(system, None))
}
/**
* Returns true if this member is not tagged with the role configured for the
* replicas.
*/
def isTerminated: Boolean =
Cluster(system).isTerminated || !settings.roles.subsetOf(Cluster(system).selfRoles)
}
/**
* Cluster non-specific (typed vs untyped) wrapper for [[akka.cluster.UniqueAddress]].
*/
@SerialVersionUID(1L)
final case class SelfUniqueAddress(uniqueAddress: UniqueAddress)

View file

@ -65,14 +65,19 @@ final class GCounter private[akka] (
* Increment the counter with the delta `n` specified.
* The delta must be zero or positive.
*/
def +(n: Long)(implicit node: Cluster): GCounter = increment(node, n)
def :+(n: Long)(implicit node: SelfUniqueAddress): GCounter = increment(node.uniqueAddress, n)
@deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def +(n: Long)(implicit node: Cluster): GCounter = increment(node.selfUniqueAddress, n)
/**
* Increment the counter with the delta `n` specified.
* The delta `n` must be zero or positive.
*/
def increment(node: Cluster, n: Long = 1): GCounter =
increment(node.selfUniqueAddress, n)
def increment(node: SelfUniqueAddress, n: Long): GCounter = increment(node.uniqueAddress, n)
@deprecated("Use `increment` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def increment(node: Cluster, n: Long = 1): GCounter = increment(node.selfUniqueAddress, n)
/**
* INTERNAL API

View file

@ -4,9 +4,9 @@
package akka.cluster.ddata
import akka.annotation.InternalApi
import akka.cluster.Cluster
import akka.cluster.UniqueAddress
import akka.annotation.InternalApi
import akka.cluster.ddata.ORMap.ZeroTag
object LWWMap {
@ -87,6 +87,12 @@ final class LWWMap[A, B] private[akka] (
/**
* Adds an entry to the map
*/
def :+(entry: (A, B))(implicit node: SelfUniqueAddress): LWWMap[A, B] = {
val (key, value) = entry
put(node, key, value)
}
@deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def +(entry: (A, B))(implicit node: Cluster): LWWMap[A, B] = {
val (key, value) = entry
put(node, key, value)
@ -95,8 +101,12 @@ final class LWWMap[A, B] private[akka] (
/**
* Adds an entry to the map
*/
def put(node: SelfUniqueAddress, key: A, value: B): LWWMap[A, B] =
put(node.uniqueAddress, key, value, defaultClock[B])
@deprecated("Use `put` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def put(node: Cluster, key: A, value: B): LWWMap[A, B] =
put(node, key, value, defaultClock[B])
put(node.selfUniqueAddress, key, value, defaultClock[B])
/**
* Adds an entry to the map.
@ -106,6 +116,10 @@ final class LWWMap[A, B] private[akka] (
* increasing version number from a database record that is used for optimistic
* concurrency control.
*/
def put(node: SelfUniqueAddress, key: A, value: B, clock: Clock[B]): LWWMap[A, B] =
put(node.uniqueAddress, key, value, clock)
@deprecated("Use `put` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def put(node: Cluster, key: A, value: B, clock: Clock[B]): LWWMap[A, B] =
put(node.selfUniqueAddress, key, value, clock)
@ -117,8 +131,9 @@ final class LWWMap[A, B] private[akka] (
* increasing version number from a database record that is used for optimistic
* concurrency control.
*/
@deprecated("Use `put` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def put(key: A, value: B)(implicit node: Cluster, clock: Clock[B] = defaultClock[B]): LWWMap[A, B] =
put(node, key, value, clock)
put(node.selfUniqueAddress, key, value, clock)
/**
* INTERNAL API
@ -136,6 +151,7 @@ final class LWWMap[A, B] private[akka] (
* Note that if there is a conflicting update on another node the entry will
* not be removed after merge.
*/
@deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def -(key: A)(implicit node: Cluster): LWWMap[A, B] = remove(node, key)
/**
@ -143,6 +159,10 @@ final class LWWMap[A, B] private[akka] (
* Note that if there is a conflicting update on another node the entry will
* not be removed after merge.
*/
def remove(node: SelfUniqueAddress, key: A): LWWMap[A, B] =
remove(node.uniqueAddress, key)
@deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def remove(node: Cluster, key: A): LWWMap[A, B] =
remove(node.selfUniqueAddress, key)

View file

@ -4,10 +4,10 @@
package akka.cluster.ddata
import akka.annotation.InternalApi
import akka.cluster.Cluster
import akka.cluster.UniqueAddress
import akka.util.HashCode
import akka.annotation.InternalApi
object LWWRegister {
@ -48,20 +48,48 @@ object LWWRegister {
@InternalApi private[akka] def apply[A](node: UniqueAddress, initialValue: A, clock: Clock[A]): LWWRegister[A] =
new LWWRegister(node, initialValue, clock(0L, initialValue))
def apply[A](node: SelfUniqueAddress, initialValue: A): LWWRegister[A] =
apply(node.uniqueAddress, initialValue, defaultClock[A])
def apply[A](node: SelfUniqueAddress, initialValue: A, clock: Clock[A]): LWWRegister[A] =
apply(node.uniqueAddress, initialValue, clock)
@deprecated("Use `apply` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def apply[A](initialValue: A)(implicit node: Cluster, clock: Clock[A] = defaultClock[A]): LWWRegister[A] =
apply(node.selfUniqueAddress, initialValue, clock)
/**
* Scala API
* Creates a `LWWRegister` with implicits, given deprecated `apply` functions using Cluster constrain overloading.
*/
def create[A](initialValue: A)(implicit node: SelfUniqueAddress, clock: Clock[A] = defaultClock[A]): LWWRegister[A] =
apply(node.uniqueAddress, initialValue, clock)
/**
* Java API
*/
@deprecated("Use `create` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def create[A](node: Cluster, initialValue: A): LWWRegister[A] =
apply(initialValue)(node)
/**
* Java API
*/
@deprecated("Use `create` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def create[A](node: Cluster, initialValue: A, clock: Clock[A]): LWWRegister[A] =
apply(initialValue)(node, clock)
apply(node.selfUniqueAddress, initialValue, clock)
/**
* Java API
*/
def create[A](node: SelfUniqueAddress, initialValue: A, clock: Clock[A]): LWWRegister[A] =
apply(node.uniqueAddress, initialValue, clock)
/**
* Java API
*/
def create[A](node: SelfUniqueAddress, initialValue: A): LWWRegister[A] =
apply(node.uniqueAddress, initialValue, defaultClock[A])
/**
* Extract the [[LWWRegister#value]].
@ -122,13 +150,13 @@ final class LWWRegister[A] private[akka] (
* increasing version number from a database record that is used for optimistic
* concurrency control.
*/
def withValue(value: A)(implicit node: Cluster, clock: Clock[A] = defaultClock[A]): LWWRegister[A] =
withValue(node, value, clock)
def withValue(node: SelfUniqueAddress, value: A, clock: Clock[A]): LWWRegister[A] =
withValue(node.uniqueAddress, value, clock)
/**
* Change the value of the register.
*/
def withValue(node: Cluster, value: A): LWWRegister[A] =
def withValue(node: SelfUniqueAddress, value: A): LWWRegister[A] =
withValue(node, value, defaultClock[A])
/**
@ -139,6 +167,18 @@ final class LWWRegister[A] private[akka] (
* increasing version number from a database record that is used for optimistic
* concurrency control.
*/
def withValueOf(value: A)(implicit node: SelfUniqueAddress, clock: Clock[A] = defaultClock[A]): LWWRegister[A] =
withValue(node, value, clock)
@deprecated("Use `withValueOf` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def withValue(value: A)(implicit node: Cluster, clock: Clock[A] = defaultClock[A]): LWWRegister[A] =
withValue(node, value, clock)
@deprecated("Use `withValue` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def withValue(node: Cluster, value: A): LWWRegister[A] =
withValue(node, value, defaultClock[A])
@deprecated("Use `withValue` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def withValue(node: Cluster, value: A, clock: Clock[A]): LWWRegister[A] =
withValue(node.selfUniqueAddress, value, clock)

View file

@ -199,9 +199,15 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
* Adds an entry to the map
* @see [[#put]]
*/
def :+(entry: (A, B))(implicit node: SelfUniqueAddress): ORMap[A, B] = {
val (key, value) = entry
put(node.uniqueAddress, key, value)
}
@deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def +(entry: (A, B))(implicit node: Cluster): ORMap[A, B] = {
val (key, value) = entry
put(node, key, value)
put(node.selfUniqueAddress, key, value)
}
/**
@ -217,6 +223,9 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
* value, because important history can be lost when replacing the `ORSet` and
* undesired effects of merging will occur. Use [[ORMultiMap]] or [[#updated]] instead.
*/
def put(node: SelfUniqueAddress, key: A, value: B): ORMap[A, B] = put(node.uniqueAddress, key, value)
@deprecated("Use `put` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def put(node: Cluster, key: A, value: B): ORMap[A, B] = put(node.selfUniqueAddress, key, value)
/**
@ -241,6 +250,10 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
* If there is no current value for the `key` the `initial` value will be
* passed to the `modify` function.
*/
def updated(node: SelfUniqueAddress, key: A, initial: B)(modify: B B): ORMap[A, B] =
updated(node.uniqueAddress, key, initial)(modify)
@deprecated("Use `updated` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def updated(node: Cluster, key: A, initial: B)(modify: B B): ORMap[A, B] =
updated(node.selfUniqueAddress, key, initial)(modify)
@ -251,9 +264,9 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
* passed to the `modify` function.
*/
@Deprecated
@deprecated("use update for the Java API as updated is ambiguous with the Scala API", "2.5.19")
@deprecated("use update for the Java API as updated is ambiguous with the Scala API", "2.5.20")
def updated(node: Cluster, key: A, initial: B, modify: java.util.function.Function[B, B]): ORMap[A, B] =
updated(node, key, initial)(value modify.apply(value))
updated(node.selfUniqueAddress, key, initial)(value modify.apply(value))
/**
* Java API: Replace a value by applying the `modify` function on the existing value.
@ -261,6 +274,11 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
* If there is no current value for the `key` the `initial` value will be
* passed to the `modify` function.
*/
def update(node: SelfUniqueAddress, key: A, initial: B, modify: java.util.function.Function[B, B]): ORMap[A, B] =
updated(node.uniqueAddress, key, initial)(value modify.apply(value))
@Deprecated
@deprecated("Use `update` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def update(node: Cluster, key: A, initial: B, modify: java.util.function.Function[B, B]): ORMap[A, B] =
updated(node, key, initial)(value modify.apply(value))
@ -295,17 +313,25 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
}
/**
* Scala API
* Removes an entry from the map.
* Note that if there is a conflicting update on another node the entry will
* not be removed after merge.
*/
def -(key: A)(implicit node: Cluster): ORMap[A, B] = remove(node, key)
def remove(key: A)(implicit node: SelfUniqueAddress): ORMap[A, B] = remove(node.uniqueAddress, key)
/**
* Java API
* Removes an entry from the map.
* Note that if there is a conflicting update on another node the entry will
* not be removed after merge.
*/
def remove(node: SelfUniqueAddress, key: A): ORMap[A, B] = remove(node.uniqueAddress, key)
@deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def -(key: A)(implicit node: Cluster): ORMap[A, B] = remove(node.selfUniqueAddress, key)
@deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def remove(node: Cluster, key: A): ORMap[A, B] = remove(node.selfUniqueAddress, key)
/**

View file

@ -125,18 +125,28 @@ final class ORMultiMap[A, B] private[akka] (
def size: Int = underlying.keys.elements.size
/**
* Convenience for put. Requires an implicit Cluster.
* Convenience for put. Requires an implicit SelfUniqueAddress.
* @see [[#put]]
*/
def :+(entry: (A, Set[B]))(implicit node: SelfUniqueAddress): ORMultiMap[A, B] = {
val (key, value) = entry
put(node.uniqueAddress, key, value)
}
@deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def +(entry: (A, Set[B]))(implicit node: Cluster): ORMultiMap[A, B] = {
val (key, value) = entry
put(node, key, value)
put(node.selfUniqueAddress, key, value)
}
/**
* Scala API: Associate an entire set with the key while retaining the history of the previous
* replicated data set.
*/
def put(node: SelfUniqueAddress, key: A, value: Set[B]): ORMultiMap[A, B] =
put(node.uniqueAddress, key, value)
@deprecated("Use `put` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def put(node: Cluster, key: A, value: Set[B]): ORMultiMap[A, B] =
put(node.selfUniqueAddress, key, value)
@ -144,9 +154,16 @@ final class ORMultiMap[A, B] private[akka] (
* Java API: Associate an entire set with the key while retaining the history of the previous
* replicated data set.
*/
def put(node: SelfUniqueAddress, key: A, value: java.util.Set[B]): ORMultiMap[A, B] = {
import scala.collection.JavaConverters._
put(node.uniqueAddress, key, value.asScala.toSet)
}
@Deprecated
@deprecated("Use `put` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def put(node: Cluster, key: A, value: java.util.Set[B]): ORMultiMap[A, B] = {
import scala.collection.JavaConverters._
put(node, key, value.asScala.toSet)
put(node.selfUniqueAddress, key, value.asScala.toSet)
}
/**
@ -159,18 +176,27 @@ final class ORMultiMap[A, B] private[akka] (
new ORMultiMap(newUnderlying, withValueDeltas)
}
/**
* Scala API
* Remove an entire set associated with the key.
*/
def remove(key: A)(implicit node: SelfUniqueAddress): ORMultiMap[A, B] = remove(node.uniqueAddress, key)
/**
* Convenience for remove. Requires an implicit Cluster.
* @see [[#remove]]
*/
def -(key: A)(implicit node: Cluster): ORMultiMap[A, B] =
remove(node, key)
@deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def -(key: A)(implicit node: Cluster): ORMultiMap[A, B] = remove(node.selfUniqueAddress, key)
/**
* Java API
* Remove an entire set associated with the key.
*/
def remove(node: Cluster, key: A): ORMultiMap[A, B] =
remove(node.selfUniqueAddress, key)
def remove(node: SelfUniqueAddress, key: A): ORMultiMap[A, B] = remove(node.uniqueAddress, key)
@deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def remove(node: Cluster, key: A): ORMultiMap[A, B] = remove(node.selfUniqueAddress, key)
/**
* INTERNAL API
@ -185,16 +211,22 @@ final class ORMultiMap[A, B] private[akka] (
}
/**
* Scala API: Add an element to a set associated with a key. If there is no existing set then one will be initialised.
* Add an element to a set associated with a key. If there is no existing set then one will be initialised.
* TODO add implicit after deprecated is EOL.
*/
def addBinding(node: SelfUniqueAddress, key: A, element: B): ORMultiMap[A, B] =
addBinding(node.uniqueAddress, key, element)
def addBindingBy(key: A, element: B)(implicit node: SelfUniqueAddress): ORMultiMap[A, B] =
addBinding(node, key, element)
@deprecated("Use `addBinding` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def addBinding(key: A, element: B)(implicit node: Cluster): ORMultiMap[A, B] =
addBinding(node.selfUniqueAddress, key, element)
/**
* Java API: Add an element to a set associated with a key. If there is no existing set then one will be initialised.
*/
@deprecated("Use `addBinding` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def addBinding(node: Cluster, key: A, element: B): ORMultiMap[A, B] =
addBinding(key, element)(node)
addBinding(node.selfUniqueAddress, key, element)
/**
* INTERNAL API
@ -205,18 +237,24 @@ final class ORMultiMap[A, B] private[akka] (
}
/**
* Scala API: Remove an element of a set associated with a key. If there are no more elements in the set then the
* Remove an element of a set associated with a key. If there are no more elements in the set then the
* entire set will be removed.
* TODO add implicit after deprecated is EOL.
*/
def removeBinding(node: SelfUniqueAddress, key: A, element: B): ORMultiMap[A, B] =
removeBinding(node.uniqueAddress, key, element)
def removeBindingBy(key: A, element: B)(implicit node: SelfUniqueAddress): ORMultiMap[A, B] =
removeBinding(node, key, element)
@deprecated("Use `removeBinding` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def removeBinding(key: A, element: B)(implicit node: Cluster): ORMultiMap[A, B] =
removeBinding(node.selfUniqueAddress, key, element)
/**
* Java API: Remove an element of a set associated with a key. If there are no more elements in the set then the
* entire set will be removed.
*/
@Deprecated
@deprecated("Use `removeBinding` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def removeBinding(node: Cluster, key: A, element: B): ORMultiMap[A, B] =
removeBinding(key, element)(node)
removeBinding(node.selfUniqueAddress, key, element)
/**
* INTERNAL API
@ -241,6 +279,13 @@ final class ORMultiMap[A, B] private[akka] (
* and another one is added within the same Update. The order of addition and removal is important in order
* to retain history for replicated data.
*/
def replaceBinding(node: SelfUniqueAddress, key: A, oldElement: B, newElement: B): ORMultiMap[A, B] =
replaceBinding(node.uniqueAddress, key, oldElement, newElement)
def replaceBindingBy(key: A, oldElement: B, newElement: B)(implicit node: SelfUniqueAddress): ORMultiMap[A, B] =
replaceBinding(node, key, oldElement, newElement)
@deprecated("Use `replaceBinding` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def replaceBinding(key: A, oldElement: B, newElement: B)(implicit node: Cluster): ORMultiMap[A, B] =
replaceBinding(node.selfUniqueAddress, key, oldElement, newElement)

View file

@ -307,14 +307,17 @@ final class ORSet[A] private[akka] (
def size: Int = elementsMap.size
/**
* Adds an element to the set
*/
def +(element: A)(implicit node: Cluster): ORSet[A] = add(node, element)
/** Adds an element to the set. */
def :+(element: A)(implicit node: SelfUniqueAddress): ORSet[A] = add(node, element)
/**
* Adds an element to the set
*/
@deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def +(element: A)(implicit node: Cluster): ORSet[A] = add(node.selfUniqueAddress, element)
/** Adds an element to the set. */
def add(node: SelfUniqueAddress, element: A): ORSet[A] = add(node.uniqueAddress, element)
@Deprecated
@deprecated("Use `add` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def add(node: Cluster, element: A): ORSet[A] = add(node.selfUniqueAddress, element)
/**
@ -335,13 +338,27 @@ final class ORSet[A] private[akka] (
}
/**
* Scala API
* Removes an element from the set.
*/
def -(element: A)(implicit node: Cluster): ORSet[A] = remove(node, element)
def remove(element: A)(implicit node: SelfUniqueAddress): ORSet[A] = remove(node.uniqueAddress, element)
/**
* Java API
* Removes an element from the set.
*/
def remove(node: SelfUniqueAddress, element: A): ORSet[A] = remove(node.uniqueAddress, element)
/**
* Removes an element from the set.
*/
@deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def -(element: A)(implicit node: Cluster): ORSet[A] = remove(node.selfUniqueAddress, element)
/**
* Removes an element from the set.
*/
@deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def remove(node: Cluster, element: A): ORSet[A] = remove(node.selfUniqueAddress, element)
/**
@ -362,6 +379,9 @@ final class ORSet[A] private[akka] (
* This has the same result as using [[#remove]] for each
* element, but it is more efficient.
*/
def clear(node: SelfUniqueAddress): ORSet[A] = clear(node.uniqueAddress)
@deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def clear(node: Cluster): ORSet[A] = clear(node.selfUniqueAddress)
/**

View file

@ -62,67 +62,98 @@ final class PNCounter private[akka] (
* Increment the counter with the delta `n` specified.
* If the delta is negative then it will decrement instead of increment.
*/
def +(n: Long)(implicit node: Cluster): PNCounter = increment(node, n)
def :+(n: Long)(implicit node: SelfUniqueAddress): PNCounter = increment(node.uniqueAddress, n)
@deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def +(n: Long)(implicit node: Cluster): PNCounter = increment(node.selfUniqueAddress, n)
/**
* Increment the counter with the delta `n` specified.
* If the delta is negative then it will decrement instead of increment.
*/
def +(n: BigInt)(implicit node: Cluster): PNCounter = increment(node, n)
def :+(n: BigInt)(implicit node: SelfUniqueAddress): PNCounter = increment(node.uniqueAddress, n)
/**
* Increment the counter with the delta `n` specified.
* If the delta is negative then it will decrement instead of increment.
*/
def increment(node: Cluster, n: Long = 1): PNCounter =
increment(node.selfUniqueAddress, n)
@deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def +(n: BigInt)(implicit node: Cluster): PNCounter = increment(node.selfUniqueAddress, n)
/**
* Scala API: Increment the counter with the delta `n` specified.
* If the delta is negative then it will decrement instead of increment.
*/
def increment(node: Cluster, n: BigInt): PNCounter =
increment(node.selfUniqueAddress, n)
def increment(n: Long)(implicit node: SelfUniqueAddress): PNCounter = increment(node.uniqueAddress, n)
@deprecated("Use `increment` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def increment(node: Cluster, n: Long = 1): PNCounter = increment(node.selfUniqueAddress, n)
/**
* Increment the counter with the delta `n` specified.
* If the delta is negative then it will decrement instead of increment.
*/
def increment(n: BigInt)(implicit node: SelfUniqueAddress): PNCounter = increment(node.uniqueAddress, n)
@deprecated("Use `increment` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def increment(node: Cluster, n: BigInt): PNCounter = increment(node.selfUniqueAddress, n)
/**
* Java API: Increment the counter with the delta `n` specified.
* If the delta is negative then it will decrement instead of increment.
*/
def increment(node: Cluster, n: java.math.BigInteger): PNCounter =
increment(node.selfUniqueAddress, n)
def increment(node: SelfUniqueAddress, n: java.math.BigInteger): PNCounter = increment(node.uniqueAddress, n)
/**
* Java API: Increment the counter with the delta `n` specified.
* If the delta is negative then it will decrement instead of increment.
*/
def increment(node: SelfUniqueAddress, n: Long): PNCounter = increment(node.uniqueAddress, n)
@deprecated("Use `increment` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def increment(node: Cluster, n: java.math.BigInteger): PNCounter = increment(node.selfUniqueAddress, n)
/**
* Decrement the counter with the delta `n` specified.
* If the delta is negative then it will increment instead of decrement.
*/
def -(n: Long)(implicit node: Cluster): PNCounter = decrement(node, n)
def decrement(n: Long)(implicit node: SelfUniqueAddress): PNCounter = decrement(node.uniqueAddress, n)
@deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def -(n: Long)(implicit node: Cluster): PNCounter = decrement(node.selfUniqueAddress, n)
/**
* Decrement the counter with the delta `n` specified.
* If the delta is negative then it will increment instead of decrement.
*/
def -(n: BigInt)(implicit node: Cluster): PNCounter = decrement(node, n)
def decrement(n: BigInt)(implicit node: SelfUniqueAddress): PNCounter = decrement(node.uniqueAddress, n)
@deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def -(n: BigInt)(implicit node: Cluster): PNCounter = decrement(node.selfUniqueAddress, n)
/**
* Decrement the counter with the delta `n` specified.
* If the delta `n` is negative then it will increment instead of decrement.
*/
def decrement(node: Cluster, n: Long = 1): PNCounter =
decrement(node.selfUniqueAddress, n)
def decrement(node: SelfUniqueAddress, n: Long): PNCounter = decrement(node.uniqueAddress, n)
@deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def decrement(node: Cluster, n: Long = 1): PNCounter = decrement(node.selfUniqueAddress, n)
/**
* Scala API: Decrement the counter with the delta `n` specified.
* If the delta `n` is negative then it will increment instead of decrement.
*/
def decrement(node: Cluster, n: BigInt): PNCounter =
decrement(node.selfUniqueAddress, n)
def decrement(node: SelfUniqueAddress, n: BigInt): PNCounter = decrement(node.uniqueAddress, n)
@deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def decrement(node: Cluster, n: BigInt): PNCounter = decrement(node.selfUniqueAddress, n)
/**
* Java API: Decrement the counter with the delta `n` specified.
* If the delta `n` is negative then it will increment instead of decrement.
*/
def decrement(node: Cluster, n: java.math.BigInteger): PNCounter =
decrement(node.selfUniqueAddress, n)
def decrement(node: SelfUniqueAddress, n: java.math.BigInteger): PNCounter = decrement(node.uniqueAddress, n)
@Deprecated
@deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def decrement(node: Cluster, n: java.math.BigInteger): PNCounter = decrement(node.selfUniqueAddress, n)
/** Internal API */
@InternalApi private[akka] def increment(key: UniqueAddress, n: BigInt): PNCounter = change(key, n)

View file

@ -4,11 +4,11 @@
package akka.cluster.ddata
import akka.cluster.Cluster
import akka.cluster.UniqueAddress
import java.math.BigInteger
import akka.annotation.InternalApi
import akka.cluster.Cluster
import akka.cluster.UniqueAddress
import akka.cluster.ddata.ORMap._
object PNCounterMap {
@ -75,13 +75,24 @@ final class PNCounterMap[A] private[akka] (
* Increment the counter with the delta specified.
* If the delta is negative then it will decrement instead of increment.
*/
def increment(key: A, delta: Long = 1)(implicit node: Cluster): PNCounterMap[A] =
increment(node, key, delta)
def incrementBy(key: A, delta: Long)(implicit node: SelfUniqueAddress): PNCounterMap[A] =
increment(node.uniqueAddress, key, delta)
/**
* Increment the counter with the delta specified.
* If the delta is negative then it will decrement instead of increment.
*/
def increment(key: A, delta: Long = 1)(implicit node: Cluster): PNCounterMap[A] =
increment(node.selfUniqueAddress, key, delta)
/**
* Increment the counter with the delta specified.
* If the delta is negative then it will decrement instead of increment.
*/
def increment(node: SelfUniqueAddress, key: A, delta: Long): PNCounterMap[A] =
increment(node.uniqueAddress, key, delta)
@deprecated("Use `increment` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def increment(node: Cluster, key: A, delta: Long): PNCounterMap[A] =
increment(node.selfUniqueAddress, key, delta)
@ -94,14 +105,28 @@ final class PNCounterMap[A] private[akka] (
/**
* Decrement the counter with the delta specified.
* If the delta is negative then it will increment instead of decrement.
* TODO add implicit after deprecated is EOL.
*/
def decrement(key: A, delta: Long = 1)(implicit node: Cluster): PNCounterMap[A] =
def decrementBy(key: A, delta: Long = 1)(implicit node: SelfUniqueAddress): PNCounterMap[A] =
decrement(node, key, delta)
/**
* Decrement the counter with the delta specified.
* If the delta is negative then it will increment instead of decrement.
* TODO add implicit after deprecated is EOL.
*/
def decrement(node: SelfUniqueAddress, key: A, delta: Long): PNCounterMap[A] =
decrement(node.uniqueAddress, key, delta)
@deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def decrement(key: A, delta: Long = 1)(implicit node: Cluster): PNCounterMap[A] =
decrement(node.selfUniqueAddress, key, delta)
/**
* Decrement the counter with the delta specified.
* If the delta is negative then it will increment instead of decrement.
*/
@deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def decrement(node: Cluster, key: A, delta: Long): PNCounterMap[A] =
decrement(node.selfUniqueAddress, key, delta)
@ -117,16 +142,16 @@ final class PNCounterMap[A] private[akka] (
* Note that if there is a conflicting update on another node the entry will
* not be removed after merge.
*/
def -(key: A)(implicit node: Cluster): PNCounterMap[A] = remove(node, key)
def remove(key: A)(implicit node: SelfUniqueAddress): PNCounterMap[A] =
remove(node.uniqueAddress, key)
/**
* Removes an entry from the map.
* Note that if there is a conflicting update on another node the entry will
* not be removed after merge.
*/
@deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def remove(node: Cluster, key: A): PNCounterMap[A] =
remove(node.selfUniqueAddress, key)
@deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def -(key: A)(implicit node: Cluster): PNCounterMap[A] = remove(node, key)
/**
* INTERNAL API
*/

View file

@ -100,6 +100,15 @@ object ReplicatorSettings {
*/
@InternalApi private[akka] def roleOption(role: String): Option[String] =
if (role == "") None else Option(role)
/**
* INTERNAL API
* The name of the actor used in DistributedData extensions.
*/
@InternalApi private[akka] def name(system: ActorSystem, modifier: Option[String]): String = {
val name = system.settings.config.getString("akka.cluster.distributed-data.name")
modifier.map(s s + name.take(1).toUpperCase + name.drop(1)).getOrElse(name)
}
}
/**

View file

@ -9,7 +9,6 @@ import scala.annotation.tailrec
import scala.collection.immutable.TreeMap
import akka.cluster.Cluster
import akka.cluster.UniqueAddress
import akka.cluster.UniqueAddress
import akka.annotation.InternalApi
/**
@ -107,7 +106,10 @@ sealed abstract class VersionVector
/**
* Increment the version for the node passed as argument. Returns a new VersionVector.
*/
def +(node: Cluster): VersionVector = increment(node)
def :+(node: SelfUniqueAddress): VersionVector = increment(node)
@deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def +(node: Cluster): VersionVector = increment(node.selfUniqueAddress)
/**
* INTERNAL API
@ -118,6 +120,9 @@ sealed abstract class VersionVector
/**
* Increment the version for the node passed as argument. Returns a new VersionVector.
*/
def increment(node: SelfUniqueAddress): VersionVector = increment(node.uniqueAddress)
@deprecated("Use `increment` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20")
def increment(node: Cluster): VersionVector = increment(node.selfUniqueAddress)
def isEmpty: Boolean

View file

@ -50,7 +50,7 @@ object DurableDataSpec {
else
sender() ! LoadAllCompleted
case Store(key, data, reply)
case Store(_, _, reply)
if (failStore) reply match {
case Some(StoreReply(_, failureMsg, replyTo)) replyTo ! failureMsg
case None
@ -79,8 +79,8 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
override def initialParticipants = roles.size
implicit val cluster = Cluster(system)
val cluster = Cluster(system)
implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress
val timeout = 14.seconds.dilated // initialization of lmdb can be very slow in CI environment
val writeTwo = WriteTo(2, timeout)
val readTwo = ReadFrom(2, timeout)
@ -123,9 +123,9 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
r ! Get(KeyA, ReadLocal)
expectMsg(NotFound(KeyA, None))
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1)
expectMsg(UpdateSuccess(KeyA, None))
expectMsg(UpdateSuccess(KeyA, None))
@ -163,10 +163,10 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
}
enterBarrier("both-initialized")
r ! Update(KeyA, GCounter(), writeTwo)(_ + 1)
r ! Update(KeyA, GCounter(), writeTwo)(_ :+ 1)
expectMsg(UpdateSuccess(KeyA, None))
r ! Update(KeyC, ORSet.empty[String], writeTwo)(_ + myself.name)
r ! Update(KeyC, ORSet.empty[String], writeTwo)(_ :+ myself.name)
expectMsg(UpdateSuccess(KeyC, None))
enterBarrier("update-done-" + testStepCounter)
@ -203,7 +203,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
val r = newReplicator()
runOn(first) {
r ! Update(KeyC, ORSet.empty[String], WriteLocal)(_ + myself.name)
r ! Update(KeyC, ORSet.empty[String], WriteLocal)(_ :+ myself.name)
expectMsg(UpdateSuccess(KeyC, None))
}
@ -213,7 +213,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
// must do one more roundtrip to be sure that it keyB is stored, since Changed might have
// been sent out before storage
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1)
expectMsg(UpdateSuccess(KeyA, None))
watch(r)
@ -254,10 +254,10 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
r ! Get(KeyA, ReadLocal)
expectMsg(NotFound(KeyA, None))
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
r ! Update(KeyB, GCounter(), WriteLocal)(_ + 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1)
r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1)
r ! Update(KeyB, GCounter(), WriteLocal)(_ :+ 1)
expectMsg(UpdateSuccess(KeyA, None))
expectMsg(UpdateSuccess(KeyA, None))
@ -286,7 +286,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
val r2: ActorRef = newReplicator(sys2)
// it should be possible to update while loading is in progress
r2 ! Update(KeyB, GCounter(), WriteLocal)(_ + 1)
r2 ! Update(KeyB, GCounter(), WriteLocal)(_ :+ 1)
expectMsg(UpdateSuccess(KeyB, None))
// wait until all loaded
@ -325,7 +325,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
Replicator.props(
ReplicatorSettings(system).withDurableStoreProps(testDurableStoreProps(failStore = true))),
"replicator-" + testStepCounter)
r ! Update(KeyA, GCounter(), WriteLocal, request = Some("a"))(_ + 1)
r ! Update(KeyA, GCounter(), WriteLocal, request = Some("a"))(_ :+ 1)
expectMsg(StoreFailure(KeyA, Some("a")))
}
enterBarrierAfterTestStep()

View file

@ -43,7 +43,8 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN
override def initialParticipants = roles.size
implicit val cluster = Cluster(system)
val cluster = Cluster(system)
implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress
val maxPruningDissemination = 3.seconds
def startReplicator(sys: ActorSystem): ActorRef =
@ -90,7 +91,7 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN
}
}
replicator ! Update(KeyA, GCounter(), WriteLocal)(_ + 3)
replicator ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 3)
expectMsg(UpdateSuccess(KeyA, None))
replicator2.tell(Update(KeyA, GCounter(), WriteLocal)(_.increment(cluster2, 2)), probe2.ref)

View file

@ -49,7 +49,8 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w
override def initialParticipants = roles.size
implicit val cluster = Cluster(system)
val cluster = Cluster(system)
implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress
val replicator = DistributedData(system).replicator
val nodes = roles.drop(1) // controller not part of active nodes
val nodeCount = nodes.size
@ -114,7 +115,7 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w
val writeProbe = TestProbe()
val writeAcks = myData.map { i
sleepDelay()
replicator.tell(Update(key, ORSet(), WriteLocal, Some(i))(_ + i), writeProbe.ref)
replicator.tell(Update(key, ORSet(), WriteLocal, Some(i))(_ :+ i), writeProbe.ref)
writeProbe.receiveOne(3.seconds)
}
val successWriteAcks = writeAcks.collect { case success: UpdateSuccess[_] success }
@ -147,7 +148,7 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w
val writeProbe = TestProbe()
val writeAcks = myData.map { i
sleepDelay()
replicator.tell(Update(key, ORSet(), writeMajority, Some(i))(_ + i), writeProbe.ref)
replicator.tell(Update(key, ORSet(), writeMajority, Some(i))(_ :+ i), writeProbe.ref)
writeProbe.receiveOne(timeout + 1.second)
}
val successWriteAcks = writeAcks.collect { case success: UpdateSuccess[_] success }
@ -163,7 +164,7 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w
val readProbe = TestProbe()
replicator.tell(Get(key, readMajority), readProbe.ref)
val result = readProbe.expectMsgPF() { case g @ GetSuccess(`key`, _) g.get(key) }
val survivors = result.elements.size
//val survivors = result.elements.size
result.elements should be(expectedData)
}
@ -191,7 +192,7 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w
val writeProbe = TestProbe()
val writeAcks = myData.map { i
sleepDelay()
replicator.tell(Update(key, ORSet(), WriteLocal, Some(i))(_ + i), writeProbe.ref)
replicator.tell(Update(key, ORSet(), WriteLocal, Some(i))(_ :+ i), writeProbe.ref)
writeProbe.receiveOne(3.seconds)
}
val successWriteAcks = writeAcks.collect { case success: UpdateSuccess[_] success }
@ -236,7 +237,7 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w
val writeProbe = TestProbe()
val writeAcks = myData.map { i
sleepDelay()
replicator.tell(Update(key, ORSet(), writeMajority, Some(i))(_ + i), writeProbe.ref)
replicator.tell(Update(key, ORSet(), writeMajority, Some(i))(_ :+ i), writeProbe.ref)
writeProbe.receiveOne(timeout + 1.second)
}
val successWriteAcks = writeAcks.collect { case success: UpdateSuccess[_] success }
@ -260,7 +261,7 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w
val readProbe = TestProbe()
replicator.tell(Get(key, readMajority), readProbe.ref)
val result = readProbe.expectMsgPF() { case g @ GetSuccess(`key`, _) g.get(key) }
val survivors = result.elements.size
//val survivors = result.elements.size
result.elements should be(expectedData)
}
// but on the 3 node side, read from majority doesn't mean that we are guaranteed to see

View file

@ -69,7 +69,8 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe
override def initialParticipants = roles.size
implicit val cluster = Cluster(system)
val cluster = Cluster(system)
implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress
val replicator = DistributedData(system).replicator
val timeout = 3.seconds.dilated
val factor = 1 // use 3 here for serious tuning
@ -156,7 +157,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe
val n = 1000 * factor
val expectedData = (0 until n).toSet
repeat("ORSet Update WriteLocal", keys, n)({ (key, i, replyTo)
replicator.tell(Update(key, ORSet(), WriteLocal)(_ + i), replyTo)
replicator.tell(Update(key, ORSet(), WriteLocal)(_ :+ i), replyTo)
}, key awaitReplicated(key, expectedData))
enterBarrier("after-1")
@ -164,7 +165,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe
"be blazingly fast for ORSet Get ReadLocal" taggedAs PerformanceTest in {
val keys = (1 to repeatCount).map(n ORSetKey[Int]("A" + n))
repeat("Get ReadLocal", keys, 100000 * factor) { (key, i, replyTo)
repeat("Get ReadLocal", keys, 100000 * factor) { (key, _, replyTo)
replicator.tell(Get(key, ReadLocal), replyTo)
}
enterBarrier("after-2")
@ -175,7 +176,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe
val n = 200 * factor
val expected = Some((0 until n).toSet)
repeat("ORSet Update WriteLocal + gossip", keys, n, expected) { (key, i, replyTo)
replicator.tell(Update(key, ORSet(), WriteLocal)(_ + i), replyTo)
replicator.tell(Update(key, ORSet(), WriteLocal)(_ :+ i), replyTo)
}
enterBarrier("after-3")
}
@ -185,7 +186,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe
val n = 200 * factor
val expected = Some((0 until n).toSet ++ (0 until n).map(-_).toSet)
repeat("ORSet Update WriteLocal existing + gossip", keys, n, expected) { (key, i, replyTo)
replicator.tell(Update(key, ORSet(), WriteLocal)(_ + (-i)), replyTo)
replicator.tell(Update(key, ORSet(), WriteLocal)(_ :+ (-i)), replyTo)
}
enterBarrier("after-4")
}
@ -196,7 +197,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe
val expected = Some((0 until n).toSet)
val writeTwo = WriteTo(2, timeout)
repeat("ORSet Update WriteTwo + gossip", keys, n, expected) { (key, i, replyTo)
replicator.tell(Update(key, ORSet(), writeTwo)(_ + i), replyTo)
replicator.tell(Update(key, ORSet(), writeTwo)(_ :+ i), replyTo)
}
enterBarrier("after-5")
}
@ -209,7 +210,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe
val latch = TestLatch(n)
val replyTo = system.actorOf(countDownProps(latch))
for (_ 0 until n)
replicator.tell(Update(key, GCounter(), WriteLocal)(_ + 1), replyTo)
replicator.tell(Update(key, GCounter(), WriteLocal)(_ :+ 1), replyTo)
Await.ready(latch, 5.seconds + (1.second * factor))
enterBarrier("update-done-6")
runOn(n1) {
@ -247,7 +248,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe
val n = 300 * factor
val writeMajority = WriteMajority(timeout)
repeat("ORSet Update one-by-one deltas", keys, n, oneByOne = true) { (key, i, replyTo)
replicator.tell(Update(key, ORSet(), writeMajority)(_ + i), replyTo)
replicator.tell(Update(key, ORSet(), writeMajority)(_ :+ i), replyTo)
}
enterBarrier("after-7")
}
@ -257,7 +258,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe
val n = 200 * factor
val writeMajority = WriteMajority(timeout)
repeat("ORSet Update deltas", keys, n, oneByOne = false) { (key, i, replyTo)
replicator.tell(Update(key, ORSet(), writeMajority)(_ + i), replyTo)
replicator.tell(Update(key, ORSet(), writeMajority)(_ :+ i), replyTo)
}
enterBarrier("after-8")
}

View file

@ -43,7 +43,8 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult
override def initialParticipants = roles.size
implicit val cluster = Cluster(system)
val cluster = Cluster(system)
implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress
val replicator = system.actorOf(Replicator.props(
ReplicatorSettings(system).withRole("backend").withGossipInterval(1.second)), "replicator")
val timeout = 3.seconds.dilated
@ -104,18 +105,18 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult
}
runOn(first) {
(0 until 5).foreach { i
replicator ! Update(KeyA, GCounter(), WriteLocal)(_ + 1)
replicator ! Update(KeyB, PNCounter(), WriteLocal)(_ - 1)
replicator ! Update(KeyC, GCounter(), WriteAll(timeout))(_ + 1)
for (_ 0 until 5) {
replicator ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1)
replicator ! Update(KeyB, PNCounter(), WriteLocal)(_ decrement 1)
replicator ! Update(KeyC, GCounter(), WriteAll(timeout))(_ :+ 1)
}
receiveN(15).map(_.getClass).toSet should be(Set(classOf[UpdateSuccess[_]]))
}
runOn(second) {
replicator ! Update(KeyA, GCounter(), WriteLocal)(_ + 20)
replicator ! Update(KeyB, PNCounter(), WriteTo(2, timeout))(_ + 20)
replicator ! Update(KeyC, GCounter(), WriteAll(timeout))(_ + 20)
replicator ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 20)
replicator ! Update(KeyB, PNCounter(), WriteTo(2, timeout))(_ :+ 20)
replicator ! Update(KeyC, GCounter(), WriteAll(timeout))(_ :+ 20)
receiveN(3).toSet should be(Set(
UpdateSuccess(KeyA, None),
UpdateSuccess(KeyB, None), UpdateSuccess(KeyC, None)))
@ -123,23 +124,23 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult
replicator ! Update(KeyE, GSet(), WriteLocal)(_ + "e1" + "e2")
expectMsg(UpdateSuccess(KeyE, None))
replicator ! Update(KeyF, ORSet(), WriteLocal)(_ + "e1" + "e2")
replicator ! Update(KeyF, ORSet(), WriteLocal)(_ :+ "e1" :+ "e2")
expectMsg(UpdateSuccess(KeyF, None))
}
runOn(fourth) {
replicator ! Update(KeyD, GCounter(), WriteLocal)(_ + 40)
replicator ! Update(KeyD, GCounter(), WriteLocal)(_ :+ 40)
expectMsg(UpdateSuccess(KeyD, None))
replicator ! Update(KeyE, GSet(), WriteLocal)(_ + "e2" + "e3")
expectMsg(UpdateSuccess(KeyE, None))
replicator ! Update(KeyF, ORSet(), WriteLocal)(_ + "e2" + "e3")
replicator ! Update(KeyF, ORSet(), WriteLocal)(_ :+ "e2" :+ "e3")
expectMsg(UpdateSuccess(KeyF, None))
}
runOn(fifth) {
replicator ! Update(KeyX, GCounter(), WriteTo(2, timeout))(_ + 50)
replicator ! Update(KeyX, GCounter(), WriteTo(2, timeout))(_ :+ 50)
expectMsg(UpdateSuccess(KeyX, None))
replicator ! Delete(KeyX, WriteLocal)
expectMsg(DeleteSuccess(KeyX, None))
@ -168,22 +169,22 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult
enterBarrier("split")
runOn(first) {
replicator ! Update(KeyA, GCounter(), WriteTo(2, timeout))(_ + 1)
replicator ! Update(KeyA, GCounter(), WriteTo(2, timeout))(_ :+ 1)
expectMsg(UpdateSuccess(KeyA, None))
}
runOn(third) {
replicator ! Update(KeyA, GCounter(), WriteTo(2, timeout))(_ + 2)
replicator ! Update(KeyA, GCounter(), WriteTo(2, timeout))(_ :+ 2)
expectMsg(UpdateSuccess(KeyA, None))
replicator ! Update(KeyE, GSet(), WriteTo(2, timeout))(_ + "e4")
expectMsg(UpdateSuccess(KeyE, None))
replicator ! Update(KeyF, ORSet(), WriteTo(2, timeout))(_ - "e2")
replicator ! Update(KeyF, ORSet(), WriteTo(2, timeout))(_ remove "e2")
expectMsg(UpdateSuccess(KeyF, None))
}
runOn(fourth) {
replicator ! Update(KeyD, GCounter(), WriteTo(2, timeout))(_ + 1)
replicator ! Update(KeyD, GCounter(), WriteTo(2, timeout))(_ :+ 1)
expectMsg(UpdateSuccess(KeyD, None))
}
enterBarrier("update-during-split")

View file

@ -141,7 +141,8 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult
override def initialParticipants = roles.size
implicit val cluster = Cluster(system)
val cluster = Cluster(system)
implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress
val fullStateReplicator = system.actorOf(Replicator.props(
ReplicatorSettings(system).withGossipInterval(1.second).withDeltaCrdtEnabled(false)), "fullStateReplicator")
val deltaReplicator = {
@ -199,12 +200,12 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult
runOn(first) {
// by setting something for each key we don't have to worry about NotFound
List(KeyA, KeyB, KeyC).foreach { key
fullStateReplicator ! Update(key, PNCounter.empty, WriteLocal)(_ + 1)
deltaReplicator ! Update(key, PNCounter.empty, WriteLocal)(_ + 1)
fullStateReplicator ! Update(key, PNCounter.empty, WriteLocal)(_ :+ 1)
deltaReplicator ! Update(key, PNCounter.empty, WriteLocal)(_ :+ 1)
}
List(KeyD, KeyE, KeyF).foreach { key
fullStateReplicator ! Update(key, ORSet.empty[String], WriteLocal)(_ + "a")
deltaReplicator ! Update(key, ORSet.empty[String], WriteLocal)(_ + "a")
fullStateReplicator ! Update(key, ORSet.empty[String], WriteLocal)(_ :+ "a")
deltaReplicator ! Update(key, ORSet.empty[String], WriteLocal)(_ :+ "a")
}
}
enterBarrier("updated-1")
@ -232,8 +233,8 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult
"work with write consistency" in {
runOn(first) {
val p1 = TestProbe()
fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A"), p1.ref)
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A"), p1.ref)
fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ :+ "A"), p1.ref)
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ :+ "A"), p1.ref)
p1.expectMsgType[UpdateSuccess[_]]
p1.expectMsgType[UpdateSuccess[_]]
}
@ -248,9 +249,9 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult
// retry with full state to sort it out
runOn(first) {
val p1 = TestProbe()
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "B"), p1.ref)
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "C"), p1.ref)
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "D"), p1.ref)
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ :+ "B"), p1.ref)
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ :+ "C"), p1.ref)
deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ :+ "D"), p1.ref)
p1.expectMsgType[UpdateSuccess[_]]
p1.expectMsgType[UpdateSuccess[_]]
p1.expectMsgType[UpdateSuccess[_]]
@ -262,7 +263,7 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult
// add same to the fullStateReplicator so they are in sync
runOn(first) {
val p1 = TestProbe()
fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A" + "B" + "C" + "D"), p1.ref)
fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ :+ "A" :+ "B" :+ "C" :+ "D"), p1.ref)
p1.expectMsgType[UpdateSuccess[_]]
}
enterBarrier("write-3")
@ -366,22 +367,22 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult
op match {
case Delay(d) Thread.sleep(d)
case Incr(key, n, consistency)
fullStateReplicator ! Update(key, PNCounter.empty, consistency)(_ + n)
deltaReplicator ! Update(key, PNCounter.empty, consistency)(_ + n)
fullStateReplicator ! Update(key, PNCounter.empty, consistency)(_ :+ n)
deltaReplicator ! Update(key, PNCounter.empty, consistency)(_ :+ n)
case Decr(key, n, consistency)
fullStateReplicator ! Update(key, PNCounter.empty, consistency)(_ - n)
deltaReplicator ! Update(key, PNCounter.empty, consistency)(_ - n)
fullStateReplicator ! Update(key, PNCounter.empty, consistency)(_ decrement n)
deltaReplicator ! Update(key, PNCounter.empty, consistency)(_ decrement n)
case Add(key, elem, consistency)
// to have an deterministic result when mixing add/remove we can only perform
// the ORSet operations from one node
runOn((if (key == KeyF) List(first) else List(first, second, third)): _*) {
fullStateReplicator ! Update(key, ORSet.empty[String], consistency)(_ + elem)
deltaReplicator ! Update(key, ORSet.empty[String], consistency)(_ + elem)
fullStateReplicator ! Update(key, ORSet.empty[String], consistency)(_ :+ elem)
deltaReplicator ! Update(key, ORSet.empty[String], consistency)(_ :+ elem)
}
case Remove(key, elem, consistency)
runOn(first) {
fullStateReplicator ! Update(key, ORSet.empty[String], consistency)(_ - elem)
deltaReplicator ! Update(key, ORSet.empty[String], consistency)(_ - elem)
fullStateReplicator ! Update(key, ORSet.empty[String], consistency)(_ remove elem)
deltaReplicator ! Update(key, ORSet.empty[String], consistency)(_ remove elem)
}
}
}

View file

@ -164,11 +164,11 @@ object ReplicatorMapDeltaSpec extends MultiNodeConfig {
}.toVector
}
def addElementToORMap(om: ORMap[String, ORSet[String]], key: String, element: String)(implicit node: Cluster) =
om.updated(node, key, ORSet.empty[String])(_.add(node, element))
def addElementToORMap(om: ORMap[String, ORSet[String]], key: String, element: String)(implicit node: SelfUniqueAddress) =
om.updated(node, key, ORSet.empty[String])(_ :+ element)
def removeElementFromORMap(om: ORMap[String, ORSet[String]], key: String, element: String)(implicit node: Cluster) =
om.updated(node, key, ORSet.empty[String])(_.remove(node, element))
def removeElementFromORMap(om: ORMap[String, ORSet[String]], key: String, element: String)(implicit node: SelfUniqueAddress) =
om.updated(node, key, ORSet.empty[String])(_.remove(element))
}
class ReplicatorMapDeltaSpecMultiJvmNode1 extends ReplicatorMapDeltaSpec
@ -182,7 +182,8 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with
override def initialParticipants = roles.size
implicit val cluster = Cluster(system)
val cluster = Cluster(system)
implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress
val fullStateReplicator = system.actorOf(Replicator.props(
ReplicatorSettings(system).withGossipInterval(1.second).withDeltaCrdtEnabled(false)), "fullStateReplicator")
val deltaReplicator = {
@ -241,20 +242,20 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with
runOn(first) {
// by setting something for each key we don't have to worry about NotFound
List(KeyA, KeyB, KeyC).foreach { key
fullStateReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment key._2)
deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment key._2)
fullStateReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_.incrementBy(key._2, 1))
deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_.incrementBy(key._2, 1))
}
List(KeyD, KeyE, KeyF).foreach { key
fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2 Set("a")))
deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2 Set("a")))
fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ :+ (key._2 Set("a")))
deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ :+ (key._2 Set("a")))
}
List(KeyG, KeyH, KeyI).foreach { key
fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ + (key._2 Set("a")))
deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ + (key._2 Set("a")))
fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ :+ (key._2 Set("a")))
deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ :+ (key._2 Set("a")))
}
List(KeyJ, KeyK, KeyL).foreach { key
fullStateReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(_ + (key._2 (ORSet.empty + "a")))
deltaReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(_ + (key._2 (ORSet.empty + "a")))
fullStateReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(_ :+ (key._2 (ORSet.empty :+ "a")))
deltaReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(_ :+ (key._2 (ORSet.empty :+ "a")))
}
}
enterBarrier("updated-1")
@ -271,7 +272,7 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with
val p = TestProbe()
List(KeyD, KeyE, KeyF).foreach { key
fullStateReplicator.tell(Get(key._1, ReadLocal), p.ref)
val res = p.expectMsgType[GetSuccess[ORMultiMap[String, String]]].dataValue.get(key._2) should ===(Some(Set("a")))
p.expectMsgType[GetSuccess[ORMultiMap[String, String]]].dataValue.get(key._2) should ===(Some(Set("a")))
}
}
awaitAssert {
@ -300,7 +301,7 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with
system.eventStream.subscribe(errorLogProbe.ref, classOf[Error])
runOn(first) {
for (_ 1 to N; key List(KeyA, KeyB)) {
ordinaryReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment key._2)
ordinaryReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_.incrementBy(key._2, 1))
}
}
enterBarrier("updated-2")
@ -333,44 +334,44 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with
log.debug("operation: {}", op)
op match {
case Delay(d) Thread.sleep(d)
case Incr(key, n, consistency)
fullStateReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment (key._2, n))
deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment (key._2, n))
case Decr(key, n, consistency)
fullStateReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ decrement (key._2, n))
deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ decrement (key._2, n))
case AddVD(key, elem, consistency)
case Incr(key, n, _)
fullStateReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ incrementBy (key._2, n))
deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ incrementBy (key._2, n))
case Decr(key, n, _)
fullStateReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ decrementBy (key._2, n))
deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ decrementBy (key._2, n))
case AddVD(key, elem, _)
// to have an deterministic result when mixing add/remove we can only perform
// the ORSet operations from one node
runOn((if (key == KeyF) List(first) else List(first, second, third)): _*) {
fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ addBinding (key._2, elem))
deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ addBinding (key._2, elem))
fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ addBindingBy (key._2, elem))
deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ addBindingBy (key._2, elem))
}
case RemoveVD(key, elem, consistency)
case RemoveVD(key, elem, _)
runOn(first) {
fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ removeBinding (key._2, elem))
deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ removeBinding (key._2, elem))
fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ removeBindingBy (key._2, elem))
deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ removeBindingBy (key._2, elem))
}
case AddNoVD(key, elem, consistency)
case AddNoVD(key, elem, _)
// to have an deterministic result when mixing add/remove we can only perform
// the ORSet operations from one node
runOn((if (key == KeyI) List(first) else List(first, second, third)): _*) {
fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ addBinding (key._2, elem))
deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ addBinding (key._2, elem))
fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ addBindingBy (key._2, elem))
deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ addBindingBy (key._2, elem))
}
case RemoveNoVD(key, elem, consistency)
case RemoveNoVD(key, elem, _)
runOn(first) {
fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ removeBinding (key._2, elem))
deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ removeBinding (key._2, elem))
fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ removeBindingBy (key._2, elem))
deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ removeBindingBy (key._2, elem))
}
case AddOM(key, elem, consistency)
case AddOM(key, elem, _)
// to have an deterministic result when mixing add/remove we can only perform
// the ORSet operations from one node
runOn((if (key == KeyL) List(first) else List(first, second, third)): _*) {
fullStateReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(om addElementToORMap(om, key._2, elem))
deltaReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(om addElementToORMap(om, key._2, elem))
}
case RemoveOM(key, elem, consistency)
case RemoveOM(key, elem, _)
runOn(first) {
fullStateReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(om removeElementFromORMap(om, key._2, elem))
deltaReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(om removeElementFromORMap(om, key._2, elem))

View file

@ -42,7 +42,8 @@ class ReplicatorORSetDeltaSpec extends MultiNodeSpec(ReplicatorORSetDeltaSpec) w
override def initialParticipants = roles.size
implicit val cluster = Cluster(system)
val cluster = Cluster(system)
implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress
val replicator = system.actorOf(Replicator.props(
ReplicatorSettings(system).withGossipInterval(1.second)), "replicator")
val timeout = 3.seconds.dilated
@ -88,7 +89,7 @@ class ReplicatorORSetDeltaSpec extends MultiNodeSpec(ReplicatorORSetDeltaSpec) w
}
runOn(first) {
replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ + "a")
replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ :+ "a")
expectMsg(UpdateSuccess(KeyA, None))
}
@ -108,11 +109,11 @@ class ReplicatorORSetDeltaSpec extends MultiNodeSpec(ReplicatorORSetDeltaSpec) w
enterBarrier("split")
runOn(first) {
replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ + "b")
replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ :+ "b")
expectMsg(UpdateSuccess(KeyA, None))
}
runOn(second) {
replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ + "d")
replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ :+ "d")
expectMsg(UpdateSuccess(KeyA, None))
}
runOn(first, second) {
@ -129,7 +130,7 @@ class ReplicatorORSetDeltaSpec extends MultiNodeSpec(ReplicatorORSetDeltaSpec) w
runOn(first) {
// delta for "c" will be sent to third, but it has not received the previous delta for "b"
replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ + "c")
replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ :+ "c")
expectMsg(UpdateSuccess(KeyA, None))
// let the delta be propagated (will not fail if it takes longer)
Thread.sleep(1000)
@ -154,7 +155,7 @@ class ReplicatorORSetDeltaSpec extends MultiNodeSpec(ReplicatorORSetDeltaSpec) w
// and now the delta seqNr should be in sync again
runOn(first) {
replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ + "e")
replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ :+ "e")
expectMsg(UpdateSuccess(KeyA, None))
}
assertValue(KeyA, Set("a", "b", "c", "d", "e"))

View file

@ -38,7 +38,8 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
override def initialParticipants = roles.size
implicit val cluster = Cluster(system)
val cluster = Cluster(system)
implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress
val maxPruningDissemination = 3.seconds
val replicator = system.actorOf(Replicator.props(
ReplicatorSettings(system).withGossipInterval(1.second)
@ -83,19 +84,19 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
member.uniqueAddress
}
replicator ! Update(KeyA, GCounter(), WriteAll(timeout))(_ + 3)
replicator ! Update(KeyA, GCounter(), WriteAll(timeout))(_ :+ 3)
expectMsg(UpdateSuccess(KeyA, None))
replicator ! Update(KeyB, ORSet(), WriteAll(timeout))(_ + "a" + "b" + "c")
replicator ! Update(KeyB, ORSet(), WriteAll(timeout))(_ :+ "a" :+ "b" :+ "c")
expectMsg(UpdateSuccess(KeyB, None))
replicator ! Update(KeyC, PNCounterMap.empty[String], WriteAll(timeout)) { _ increment "x" increment "y" }
replicator ! Update(KeyC, PNCounterMap.empty[String], WriteAll(timeout)) { _.incrementBy("x", 1).incrementBy("y", 1) }
expectMsg(UpdateSuccess(KeyC, None))
replicator ! Update(KeyD, ORMultiMap.empty[String, String], WriteAll(timeout)) { _ + ("a" Set("A")) }
replicator ! Update(KeyD, ORMultiMap.empty[String, String], WriteAll(timeout)) { _ :+ ("a" Set("A")) }
expectMsg(UpdateSuccess(KeyD, None))
replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteAll(timeout)) { _ + ("a" GSet.empty[String].add("A")) }
replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteAll(timeout)) { _ :+ ("a" GSet.empty[String].add("A")) }
expectMsg(UpdateSuccess(KeyE, None))
enterBarrier("updates-done")
@ -125,7 +126,7 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
enterBarrier("get-old")
runOn(third) {
replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteLocal) { _ - "a" }
replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteLocal) { _ remove "a" }
expectMsg(UpdateSuccess(KeyE, None))
}
@ -206,7 +207,7 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
def updateAfterPruning(expectedValue: Int): Unit = {
replicator ! Update(KeyA, GCounter(), WriteAll(timeout), None) { existing
// inject data from removed node to simulate bad data
existing.merge(oldCounter) + 1
existing.merge(oldCounter) :+ 1
}
expectMsgPF() {
case UpdateSuccess(KeyA, _)

View file

@ -40,7 +40,8 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
override def initialParticipants = roles.size
implicit val cluster = Cluster(system)
val cluster = Cluster(system)
implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress
val replicator = system.actorOf(Replicator.props(
ReplicatorSettings(system).withGossipInterval(1.second).withMaxDeltaElements(10)), "replicator")
val timeout = 3.seconds.dilated
@ -100,8 +101,8 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
replicator ! Get(KeyA, ReadLocal)
expectMsg(NotFound(KeyA, None))
val c3 = GCounter() + 3
replicator ! Update(KeyA, GCounter(), WriteLocal)(_ + 3)
val c3 = GCounter() :+ 3
replicator ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 3)
expectMsg(UpdateSuccess(KeyA, None))
replicator ! Get(KeyA, ReadLocal)
expectMsg(GetSuccess(KeyA, None)(c3)).dataValue should be(c3)
@ -111,31 +112,31 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
replicator ! Subscribe(KeyA, changedProbe2.ref)
changedProbe2.expectMsg(Changed(KeyA)(c3)).dataValue should be(c3)
val c4 = c3 + 1
val c4 = c3 :+ 1
// too strong consistency level
replicator ! Update(KeyA, GCounter(), writeTwo)(_ + 1)
replicator ! Update(KeyA, GCounter(), writeTwo)(_ :+ 1)
expectMsg(timeout + 1.second, UpdateTimeout(KeyA, None))
replicator ! Get(KeyA, ReadLocal)
expectMsg(GetSuccess(KeyA, None)(c4)).dataValue should be(c4)
changedProbe.expectMsg(Changed(KeyA)(c4)).dataValue should be(c4)
val c5 = c4 + 1
val c5 = c4 :+ 1
// too strong consistency level
replicator ! Update(KeyA, GCounter(), writeMajority)(_ + 1)
replicator ! Update(KeyA, GCounter(), writeMajority)(_ :+ 1)
expectMsg(UpdateSuccess(KeyA, None))
replicator ! Get(KeyA, readMajority)
expectMsg(GetSuccess(KeyA, None)(c5)).dataValue should be(c5)
changedProbe.expectMsg(Changed(KeyA)(c5)).dataValue should be(c5)
val c6 = c5 + 1
replicator ! Update(KeyA, GCounter(), writeAll)(_ + 1)
val c6 = c5 :+ 1
replicator ! Update(KeyA, GCounter(), writeAll)(_ :+ 1)
expectMsg(UpdateSuccess(KeyA, None))
replicator ! Get(KeyA, readAll)
expectMsg(GetSuccess(KeyA, None)(c6)).dataValue should be(c6)
changedProbe.expectMsg(Changed(KeyA)(c6)).dataValue should be(c6)
val c9 = GCounter() + 9
replicator ! Update(KeyX, GCounter(), WriteLocal)(_ + 9)
val c9 = GCounter() :+ 9
replicator ! Update(KeyX, GCounter(), WriteLocal)(_ :+ 9)
expectMsg(UpdateSuccess(KeyX, None))
changedProbe.expectMsg(Changed(KeyX)(c9)).dataValue should be(c9)
replicator ! Delete(KeyX, WriteLocal, Some(777))
@ -145,7 +146,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
expectMsg(DataDeleted(KeyX, Some(789)))
replicator ! Get(KeyX, readAll, Some(456))
expectMsg(DataDeleted(KeyX, Some(456)))
replicator ! Update(KeyX, GCounter(), WriteLocal, Some(123))(_ + 1)
replicator ! Update(KeyX, GCounter(), WriteLocal, Some(123))(_ :+ 1)
expectMsg(DataDeleted(KeyX, Some(123)))
replicator ! Delete(KeyX, WriteLocal, Some(555))
expectMsg(DataDeleted(KeyX, Some(555)))
@ -214,11 +215,11 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
runOn(first, second) {
// start with 20 on both nodes
replicator ! Update(KeyB, GCounter(), WriteLocal)(_ + 20)
replicator ! Update(KeyB, GCounter(), WriteLocal)(_ :+ 20)
expectMsg(UpdateSuccess(KeyB, None))
// add 1 on both nodes using WriteTwo
replicator ! Update(KeyB, GCounter(), writeTwo)(_ + 1)
replicator ! Update(KeyB, GCounter(), writeTwo)(_ :+ 1)
expectMsg(UpdateSuccess(KeyB, None))
// the total, after replication should be 42
@ -232,7 +233,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
runOn(first, second) {
// add 1 on both nodes using WriteAll
replicator ! Update(KeyB, GCounter(), writeAll)(_ + 1)
replicator ! Update(KeyB, GCounter(), writeAll)(_ :+ 1)
expectMsg(UpdateSuccess(KeyB, None))
// the total, after replication should be 44
@ -246,7 +247,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
runOn(first, second) {
// add 1 on both nodes using WriteMajority
replicator ! Update(KeyB, GCounter(), writeMajority)(_ + 1)
replicator ! Update(KeyB, GCounter(), writeMajority)(_ :+ 1)
expectMsg(UpdateSuccess(KeyB, None))
// the total, after replication should be 46
@ -267,14 +268,14 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
}
runOn(first) {
replicator ! Update(KeyC, GCounter(), writeTwo)(_ + 30)
replicator ! Update(KeyC, GCounter(), writeTwo)(_ :+ 30)
expectMsg(UpdateSuccess(KeyC, None))
changedProbe.expectMsgPF() { case c @ Changed(KeyC) c.get(KeyC).value } should be(30)
replicator ! Update(KeyY, GCounter(), writeTwo)(_ + 30)
replicator ! Update(KeyY, GCounter(), writeTwo)(_ :+ 30)
expectMsg(UpdateSuccess(KeyY, None))
replicator ! Update(KeyZ, GCounter(), writeMajority)(_ + 30)
replicator ! Update(KeyZ, GCounter(), writeMajority)(_ :+ 30)
expectMsg(UpdateSuccess(KeyZ, None))
}
enterBarrier("update-c30")
@ -286,7 +287,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
changedProbe.expectMsgPF() { case c @ Changed(KeyC) c.get(KeyC).value } should be(30)
// replicate with gossip after WriteLocal
replicator ! Update(KeyC, GCounter(), WriteLocal)(_ + 1)
replicator ! Update(KeyC, GCounter(), WriteLocal)(_ :+ 1)
expectMsg(UpdateSuccess(KeyC, None))
changedProbe.expectMsgPF() { case c @ Changed(KeyC) c.get(KeyC).value } should be(31)
@ -320,7 +321,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
val c31 = expectMsgPF() { case g @ GetSuccess(KeyC, _) g.get(KeyC) }
c31.value should be(31)
replicator ! Update(KeyC, GCounter(), WriteLocal)(_ + 1)
replicator ! Update(KeyC, GCounter(), WriteLocal)(_ :+ 1)
expectMsg(UpdateSuccess(KeyC, None))
within(5.seconds) {
@ -337,7 +338,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
"converge after partition" in {
runOn(first) {
replicator ! Update(KeyD, GCounter(), writeTwo)(_ + 40)
replicator ! Update(KeyD, GCounter(), writeTwo)(_ :+ 40)
expectMsg(UpdateSuccess(KeyD, None))
testConductor.blackhole(first, second, Direction.Both).await
@ -348,15 +349,15 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
replicator ! Get(KeyD, ReadLocal)
val c40 = expectMsgPF() { case g @ GetSuccess(KeyD, _) g.get(KeyD) }
c40.value should be(40)
replicator ! Update(KeyD, GCounter() + 1, writeTwo)(_ + 1)
replicator ! Update(KeyD, GCounter() :+ 1, writeTwo)(_ :+ 1)
expectMsg(timeout + 1.second, UpdateTimeout(KeyD, None))
replicator ! Update(KeyD, GCounter(), writeTwo)(_ + 1)
replicator ! Update(KeyD, GCounter(), writeTwo)(_ :+ 1)
expectMsg(timeout + 1.second, UpdateTimeout(KeyD, None))
}
runOn(first) {
for (n 1 to 30) {
val KeyDn = GCounterKey("D" + n)
replicator ! Update(KeyDn, GCounter(), WriteLocal)(_ + n)
replicator ! Update(KeyDn, GCounter(), WriteLocal)(_ :+ n)
expectMsg(UpdateSuccess(KeyDn, None))
}
}
@ -400,7 +401,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
enterBarrier("3-nodes")
runOn(first, second, third) {
replicator ! Update(KeyE, GCounter(), writeMajority)(_ + 50)
replicator ! Update(KeyE, GCounter(), writeMajority)(_ :+ 50)
expectMsg(UpdateSuccess(KeyE, None))
}
enterBarrier("write-initial-majority")
@ -419,7 +420,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
enterBarrier("blackhole-third")
runOn(second) {
replicator ! Update(KeyE, GCounter(), WriteLocal)(_ + 1)
replicator ! Update(KeyE, GCounter(), WriteLocal)(_ :+ 1)
expectMsg(UpdateSuccess(KeyE, None))
}
enterBarrier("local-update-from-second")
@ -432,7 +433,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
probe2.expectMsgType[GetSuccess[_]]
replicator.tell(Update(KeyE, GCounter(), writeMajority, None) { data
probe1.ref ! data.value
data + 1
data :+ 1
}, probe2.ref)
// verify read your own writes, without waiting for the UpdateSuccess reply
// note that the order of the replies are not defined, and therefore we use separate probes
@ -449,13 +450,13 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
val probe1 = TestProbe()
replicator.tell(Get(KeyE, readMajority), probe1.ref)
probe1.expectMsgType[GetSuccess[_]]
replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(153))(_ + 1), probe1.ref)
replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(153))(_ :+ 1), probe1.ref)
// verify read your own writes, without waiting for the UpdateSuccess reply
// note that the order of the replies are not defined, and therefore we use separate probes
val probe2 = TestProbe()
replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(154))(_ + 1), probe2.ref)
replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(154))(_ :+ 1), probe2.ref)
val probe3 = TestProbe()
replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(155))(_ + 1), probe3.ref)
replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(155))(_ :+ 1), probe3.ref)
val probe5 = TestProbe()
replicator.tell(Get(KeyE, readMajority), probe5.ref)
probe1.expectMsg(UpdateSuccess(KeyE, Some(153)))
@ -492,9 +493,9 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
"converge after many concurrent updates" in within(10.seconds) {
runOn(first, second, third) {
var c = GCounter()
for (i 0 until 100) {
c += 1
replicator ! Update(KeyF, GCounter(), writeTwo)(_ + 1)
for (_ 0 until 100) {
c :+= 1
replicator ! Update(KeyF, GCounter(), writeTwo)(_ :+ 1)
}
val results = receiveN(100)
results.map(_.getClass).toSet should be(Set(classOf[UpdateSuccess[_]]))
@ -510,7 +511,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
"read-repair happens before GetSuccess" in {
runOn(first) {
replicator ! Update(KeyG, ORSet(), writeTwo)(_ + "a" + "b")
replicator ! Update(KeyG, ORSet(), writeTwo)(_ :+ "a" :+ "b")
expectMsgType[UpdateSuccess[_]]
}
enterBarrier("a-b-added-to-G")
@ -528,20 +529,20 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
runOn(second) {
replicator ! Subscribe(KeyH, changedProbe.ref)
replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ + ("a" Flag.Disabled))
replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ :+ ("a" Flag.Disabled))
changedProbe.expectMsgPF() { case c @ Changed(KeyH) c.get(KeyH).entries } should be(Map("a" Flag.Disabled))
}
enterBarrier("update-h1")
runOn(first) {
replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ + ("a" Flag.Enabled))
replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ :+ ("a" Flag.Enabled))
}
runOn(second) {
changedProbe.expectMsgPF() { case c @ Changed(KeyH) c.get(KeyH).entries } should be(Map("a" Flag.Enabled))
replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ + ("b" Flag.Enabled))
replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ :+ ("b" Flag.Enabled))
changedProbe.expectMsgPF() { case c @ Changed(KeyH) c.get(KeyH).entries } should be(
Map("a" Flag.Enabled, "b" Flag.Enabled))
}
@ -568,7 +569,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec
replicator ! Update(KeyI, GSet.empty[String], writeTwo)(_ + "a")
}
changedProbe.expectNoMsg(1.second)
changedProbe.expectNoMessage(1.second)
enterBarrierAfterTestStep()
}

View file

@ -4,13 +4,11 @@
package akka.cluster.ddata;
import akka.cluster.Cluster;
public class ORMapTest {
public void compileOnlyORMapTest() {
// primarily to check API accessibility with overloads/types
Cluster node1 = null;
SelfUniqueAddress node1 = null;
ORMap<String, PNCounterMap<String>> orMap = ORMap.create();
// updated needs a cast

View file

@ -4,13 +4,11 @@
package akka.cluster.ddata;
import akka.cluster.Cluster;
public class ORMultiMapTest {
public void compileOnlyORMultiMapTest() {
// primarily to check API accessibility with overloads/types
Cluster node = null;
SelfUniqueAddress node = null;
ORMultiMap<String, String> orMultiMap = ORMultiMap.create();
orMultiMap.addBinding(node, "a", "1");
orMultiMap.removeBinding(node, "a", "1");

View file

@ -4,8 +4,6 @@
package akka.cluster.ddata;
import akka.cluster.Cluster;
import java.math.BigInteger;
public class PNCounterTest {
@ -13,8 +11,8 @@ public class PNCounterTest {
public void compileOnlyPNCounterApiTest() {
// primarily to check API accessibility with overloads/types
Cluster node1 = null;
Cluster node2 = null;
SelfUniqueAddress node1 = null;
SelfUniqueAddress node2 = null;
PNCounter c1 = PNCounter.create();

View file

@ -56,17 +56,19 @@ class LWWRegisterSpec extends WordSpec with Matchers {
}
"use monotonically increasing defaultClock" in {
(1 to 100).foldLeft(LWWRegister(node1, 0, defaultClock)) {
implicit val node = SelfUniqueAddress(node1)
(1 to 100).foldLeft(LWWRegister.create(0)) {
case (r, n)
r.value should be(n - 1)
val r2 = r.withValue(node1, n, defaultClock[Int])
val r2 = r.withValueOf(n)
r2.timestamp should be > r.timestamp
r2
}
}
"have unapply extractor" in {
val r1 = LWWRegister(node1, "a", defaultClock)
val r1 = LWWRegister(node1, "a", defaultClock[String])
val LWWRegister(value1) = r1
val value2: String = value1
Changed(LWWRegisterKey[String]("key"))(r1) match {

View file

@ -8,7 +8,6 @@ import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Stash
import akka.cluster.Cluster
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import com.typesafe.config.ConfigFactory
@ -25,12 +24,14 @@ object LocalConcurrencySpec {
}
class Updater extends Actor with Stash {
implicit val cluster = Cluster(context.system)
implicit val selfUniqueAddress = DistributedData(context.system).selfUniqueAddress
val replicator = DistributedData(context.system).replicator
def receive = {
case s: String
val update = Replicator.Update(Updater.key, ORSet.empty[String], Replicator.WriteLocal)(_ + s)
val update = Replicator.Update(Updater.key, ORSet.empty[String], Replicator.WriteLocal)(_ :+ s)
replicator ! update
}
}

View file

@ -4,20 +4,14 @@
package akka.cluster.ddata
import scala.concurrent.duration._
import java.util.concurrent.ThreadLocalRandom
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.Replicator.Changed
import akka.cluster.ddata.Replicator.GetKeyIds
import akka.cluster.ddata.Replicator.GetKeyIdsResult
import akka.cluster.ddata.Replicator.Subscribe
import akka.cluster.ddata.Replicator.Update
import akka.cluster.ddata.Replicator.UpdateResponse
import akka.cluster.ddata.Replicator.WriteLocal
import com.typesafe.config.ConfigFactory
/**
@ -75,8 +69,8 @@ class LotsOfDataBot extends Actor with ActorLogging {
import LotsOfDataBot._
import Replicator._
implicit val selfUniqueAddress = DistributedData(context.system).selfUniqueAddress
val replicator = DistributedData(context.system).replicator
implicit val cluster = Cluster(context.system)
import context.dispatcher
val isPassive = context.system.settings.config.getBoolean("passive")
@ -110,10 +104,10 @@ class LotsOfDataBot extends Actor with ActorLogging {
val s = ThreadLocalRandom.current().nextInt(97, 123).toChar.toString
if (count <= maxEntries || ThreadLocalRandom.current().nextBoolean()) {
// add
replicator ! Update(key, ORSet(), WriteLocal)(_ + s)
replicator ! Update(key, ORSet(), WriteLocal)(_ :+ s)
} else {
// remove
replicator ! Update(key, ORSet(), WriteLocal)(_ - s)
replicator ! Update(key, ORSet(), WriteLocal)(_ remove s)
}
}

View file

@ -17,6 +17,11 @@ class PNCounterMapSpec extends WordSpec with Matchers {
"A PNCounterMap" must {
"be able to increment and decrement entries with implicit SelfUniqueAddress" in {
implicit val node = SelfUniqueAddress(node1)
PNCounterMap().incrementBy("a", 2).incrementBy("b", 1).incrementBy("b", 2).decrementBy("a", 1).entries should be(Map("a" 1, "b" 3))
}
"be able to increment and decrement entries" in {
val m = PNCounterMap().increment(node1, "a", 2).increment(node1, "b", 3).decrement(node2, "a", 1)
m.entries should be(Map("a" 1, "b" 3))

View file

@ -0,0 +1,31 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.ddata
import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory
import org.scalatest.{ BeforeAndAfterAll, WordSpecLike }
object ReplicatorSettingsSpec {
val config = ConfigFactory.parseString("""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1""")
}
class ReplicatorSettingsSpec extends AkkaSpec(ReplicatorSettingsSpec.config)
with WordSpecLike with BeforeAndAfterAll {
"DistributedData" must {
"have the default replicator name" in {
ReplicatorSettings.name(system, None) should ===("ddataReplicator")
}
"have the prefixed replicator name" in {
ReplicatorSettings.name(system, Some("other")) should ===("otherDdataReplicator")
}
}
}

View file

@ -186,7 +186,7 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
}
"serialize LWWRegister" in {
checkSerialization(LWWRegister(address1, "value1", LWWRegister.defaultClock))
checkSerialization(LWWRegister(address1, "value1", LWWRegister.defaultClock[String]))
checkSerialization(LWWRegister(address1, "value2", LWWRegister.defaultClock[String])
.withValue(address2, "value3", LWWRegister.defaultClock[String]))
}