diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/DistributedData.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/DistributedData.scala index 131f32ea3f..9db2bb447f 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/DistributedData.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/DistributedData.scala @@ -11,6 +11,7 @@ import akka.actor.typed.ActorRef import akka.actor.typed.ExtensionSetup import akka.annotation.DoNotInherit import akka.annotation.InternalApi +import akka.cluster.ddata.SelfUniqueAddress object DistributedData extends ExtensionId[DistributedData] { def get(system: ActorSystem[_]): DistributedData = apply(system) @@ -38,6 +39,8 @@ abstract class DistributedData extends Extension { * `ActorRef` of the [[Replicator]] . */ def replicator: ActorRef[Replicator.Command] + + def selfUniqueAddress: SelfUniqueAddress } /** @@ -48,6 +51,9 @@ abstract class DistributedData extends Extension { override val replicator: ActorRef[Replicator.Command] = akka.cluster.ddata.typed.scaladsl.DistributedData(system).replicator.narrow[Replicator.Command] + override val selfUniqueAddress: SelfUniqueAddress = + akka.cluster.ddata.typed.scaladsl.DistributedData(system).selfUniqueAddress + } object DistributedDataSetup { diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/DistributedData.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/DistributedData.scala index 54242785be..3925a7113c 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/DistributedData.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/DistributedData.scala @@ -10,6 +10,9 @@ import akka.actor.typed.ExtensionId import akka.actor.typed.ActorRef import akka.actor.ExtendedActorSystem import akka.actor.typed.Props +import akka.cluster.{ ddata ⇒ dd } +import akka.cluster.typed.{ Cluster ⇒ ClusterT } +import akka.cluster.ddata.SelfUniqueAddress object DistributedData extends ExtensionId[DistributedData] { def get(system: ActorSystem[_]): DistributedData = apply(system) @@ -30,22 +33,30 @@ object DistributedData extends ExtensionId[DistributedData] { class DistributedData(system: ActorSystem[_]) extends Extension { import akka.actor.typed.scaladsl.adapter._ + private val settings: ReplicatorSettings = ReplicatorSettings(system) + private val untypedSystem = system.toUntyped.asInstanceOf[ExtendedActorSystem] - private val config = system.settings.config.getConfig("akka.cluster.distributed-data") - private val settings = ReplicatorSettings(config) + + implicit val selfUniqueAddress: SelfUniqueAddress = dd.DistributedData(untypedSystem).selfUniqueAddress /** * `ActorRef` of the [[Replicator]] . */ - val replicator: ActorRef[Replicator.Command] = { - val configuredName = config.getString("name") - val name = "typed" + configuredName.take(1).toUpperCase + configuredName.drop(1) + val replicator: ActorRef[Replicator.Command] = + if (isTerminated) { + system.log.warning("Replicator points to dead letters: Make sure the cluster node is not terminated and has the proper role!") + system.deadLetters + } else { + val underlyingReplicator = dd.DistributedData(untypedSystem).replicator + val replicatorBehavior = Replicator.behavior(settings, underlyingReplicator) - val underlyingReplicator = akka.cluster.ddata.DistributedData(untypedSystem).replicator - val replicatorBehavior = Replicator.behavior(settings, underlyingReplicator) + system.internalSystemActorOf(replicatorBehavior, ReplicatorSettings.name(system), Props.empty) + } - system.internalSystemActorOf(replicatorBehavior, name, Props.empty) - } + /** + * Returns true if this member is not tagged with the role configured for the replicas. + */ + private def isTerminated: Boolean = dd.DistributedData(system.toUntyped).isTerminated } diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSettings.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSettings.scala index ea6232172c..28813a3c8c 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSettings.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSettings.scala @@ -7,6 +7,7 @@ package akka.cluster.ddata.typed.scaladsl import akka.cluster.{ ddata ⇒ dd } import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.adapter._ +import akka.annotation.InternalApi import com.typesafe.config.Config /** @@ -26,4 +27,11 @@ object ReplicatorSettings { */ def apply(config: Config): ReplicatorSettings = dd.ReplicatorSettings(config) + + /** + * INTERNAL API + * The name of the actor used in DistributedData extensions. + */ + @InternalApi private[akka] def name(system: ActorSystem[_]): String = + dd.ReplicatorSettings.name(system.toUntyped, Some("typed")) } diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala index 3cc2631d89..b2ebc2d201 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala @@ -10,10 +10,10 @@ import akka.cluster.ClusterEvent.{ ClusterDomainEvent, CurrentClusterState } import akka.cluster._ import akka.japi.Util import akka.actor.typed.{ ActorRef, ActorSystem, Extension, ExtensionId } -import akka.cluster.typed.internal.AdapterClusterImpl -import scala.collection.immutable - import akka.actor.typed.ExtensionSetup +import akka.cluster.typed.internal.AdapterClusterImpl + +import scala.collection.immutable /** * Messages for subscribing to changes in the cluster state diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala index d22601d3d4..29d0087fb1 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala @@ -7,13 +7,12 @@ package akka.cluster.typed.internal import akka.actor.typed.Props import akka.annotation.InternalApi import akka.cluster.ClusterEvent.MemberEvent -import akka.cluster.{ ClusterEvent, MemberStatus } +import akka.cluster.{ ClusterEvent, Member, MemberStatus, UniqueAddress } import akka.actor.typed.{ ActorRef, ActorSystem, Terminated } import akka.cluster.typed._ import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ -import akka.cluster.Member /** * INTERNAL API: diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala index b892621ef4..a6fb324523 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala @@ -12,7 +12,9 @@ import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } import akka.actor.typed.{ ActorRef, Behavior, Terminated } import akka.annotation.InternalApi import akka.cluster.ClusterEvent.MemberRemoved -import akka.cluster.ddata.{ DistributedData, ORMultiMap, ORMultiMapKey, Replicator } +import akka.cluster.ddata.typed.scaladsl.DistributedData +import akka.cluster.{ ddata ⇒ dd } +import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey, Replicator } import akka.cluster.{ Cluster, ClusterEvent, UniqueAddress } import akka.remote.AddressUidExtension import akka.util.TypedMultiMap @@ -52,13 +54,14 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { final class Setup(ctx: ActorContext[Command]) { val untypedSystem = ctx.system.toUntyped val settings = ClusterReceptionistSettings(ctx.system) - val replicator = DistributedData(untypedSystem).replicator + val replicator = dd.DistributedData(untypedSystem).replicator val selfSystemUid = AddressUidExtension(untypedSystem).longAddressUid lazy val keepTombstonesFor = cluster.settings.PruneGossipTombstonesAfter match { case f: FiniteDuration ⇒ f case _ ⇒ throw new IllegalStateException("Cannot actually happen") } - implicit val cluster = Cluster(untypedSystem) + val cluster = Cluster(untypedSystem) + implicit val selfNodeAddress = DistributedData(ctx.system).selfUniqueAddress def newTombstoneDeadline() = Deadline(keepTombstonesFor) def selfUniqueAddress: UniqueAddress = cluster.selfUniqueAddress } diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala index 5642bdfd99..af22db13d4 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala @@ -8,8 +8,8 @@ import akka.actor.typed.ActorRef import akka.actor.typed.internal.receptionist.AbstractServiceKey import akka.actor.typed.receptionist.ServiceKey import akka.annotation.InternalApi -import akka.cluster.{ Cluster, UniqueAddress } -import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey } +import akka.cluster.UniqueAddress +import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey, SelfUniqueAddress } import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey, EmptyORMultiMap, Entry } import scala.concurrent.duration.Deadline @@ -109,13 +109,13 @@ import scala.concurrent.duration.Deadline def entriesFor(key: AbstractServiceKey): Set[Entry] = entries.getOrElse(key.asServiceKey, Set.empty[Entry]) - def addBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry = - copy(entries = entries.addBinding(key, value)) + def addBinding[T](key: ServiceKey[T], value: Entry)(implicit node: SelfUniqueAddress): ServiceRegistry = + copy(entries = entries.addBinding(node, key, value)) - def removeBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry = - copy(entries = entries.removeBinding(key, value)) + def removeBinding[T](key: ServiceKey[T], value: Entry)(implicit node: SelfUniqueAddress): ServiceRegistry = + copy(entries = entries.removeBinding(node, key, value)) - def removeAll(entries: Map[AbstractServiceKey, Set[Entry]])(implicit cluster: Cluster): ServiceRegistry = { + def removeAll(entries: Map[AbstractServiceKey, Set[Entry]])(implicit node: SelfUniqueAddress): ServiceRegistry = { entries.foldLeft(this) { case (acc, (key, entries)) ⇒ entries.foldLeft(acc) { diff --git a/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java b/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java index 62cc712ebe..78fccbe73b 100644 --- a/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java +++ b/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java @@ -4,6 +4,7 @@ package akka.cluster.ddata.typed.javadsl; +import akka.cluster.ddata.*; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.junit.ClassRule; @@ -13,11 +14,6 @@ import org.scalatest.junit.JUnitSuite; // #sample import java.util.Optional; import akka.actor.typed.ActorSystem; -import akka.cluster.Cluster; -import akka.cluster.ddata.GCounter; -import akka.cluster.ddata.GCounterKey; -import akka.cluster.ddata.Key; -import akka.cluster.ddata.ReplicatedData; import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.javadsl.TestKit; import akka.actor.typed.ActorRef; @@ -86,14 +82,14 @@ public class ReplicatorTest extends JUnitSuite { static class Counter extends AbstractBehavior { private final ActorRef replicator; - private final Cluster node; + private final SelfUniqueAddress node; final ActorRef> updateResponseAdapter; final ActorRef> getResponseAdapter; final ActorRef> changedAdapter; private int cachedValue = 0; - Counter(ActorRef replicator, Cluster node, ActorContext ctx) { + Counter(ActorRef replicator, SelfUniqueAddress node, ActorContext ctx) { this.replicator = replicator; this.node = node; @@ -116,9 +112,7 @@ public class ReplicatorTest extends JUnitSuite { public static Behavior create() { return Behaviors.setup((ctx) -> { - // The distributed data types still need the implicit untyped Cluster. - // We will look into another solution for that. - Cluster node = Cluster.get(Adapter.toUntyped(ctx.getSystem())); + SelfUniqueAddress node = DistributedData.get(ctx.getSystem()).selfUniqueAddress(); ActorRef replicator = DistributedData.get(ctx.getSystem()).replicator(); return new Counter(replicator, node, ctx); @@ -127,7 +121,7 @@ public class ReplicatorTest extends JUnitSuite { // #sample // omitted from sample, needed for tests, factory above is for the docs sample - public static Behavior create(ActorRef replicator, Cluster node) { + public static Behavior create(ActorRef replicator, SelfUniqueAddress node) { return Behaviors.setup(ctx -> new Counter(replicator, node, ctx)); } // #sample @@ -213,7 +207,7 @@ public class ReplicatorTest extends JUnitSuite { ActorRef replicator = Adapter.spawnAnonymous(system, Replicator.behavior(settings)); ActorRef client = - Adapter.spawnAnonymous(system, Counter.create(replicator, Cluster.get(system))); + Adapter.spawnAnonymous(system, Counter.create(replicator, DistributedData.get(typedSystem()).selfUniqueAddress())); client.tell(Increment.INSTANCE); client.tell(new GetValue(Adapter.toTyped(probe.getRef()))); @@ -227,7 +221,7 @@ public class ReplicatorTest extends JUnitSuite { ActorRef replicator = Adapter.spawnAnonymous(system, Replicator.behavior(settings)); ActorRef client = - Adapter.spawnAnonymous(system, Counter.create(replicator, Cluster.get(system))); + Adapter.spawnAnonymous(system, Counter.create(replicator, DistributedData.get(typedSystem()).selfUniqueAddress())); client.tell(Increment.INSTANCE); client.tell(Increment.INSTANCE); diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala index 9d663600bb..ec5833a74f 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala @@ -6,13 +6,13 @@ package akka.cluster.ddata.typed.scaladsl import org.scalatest.WordSpecLike import akka.actor.testkit.typed.TestKitSettings +import akka.cluster.ddata.SelfUniqueAddress // #sample import akka.actor.Scheduler import akka.actor.typed.{ ActorRef, Behavior } import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.adapter._ import akka.cluster.Cluster import akka.cluster.ddata.typed.scaladsl.Replicator._ import akka.cluster.ddata.{ GCounter, GCounterKey } @@ -47,12 +47,9 @@ object ReplicatorSpec { val Key = GCounterKey("counter") - def client(replicator: ActorRef[Replicator.Command])(implicit cluster: Cluster): Behavior[ClientCommand] = + def client(replicator: ActorRef[Replicator.Command])(implicit node: SelfUniqueAddress): Behavior[ClientCommand] = Behaviors.setup[ClientCommand] { ctx ⇒ - // The distributed data types still need the implicit untyped Cluster. - // We will look into another solution for that. - val updateResponseAdapter: ActorRef[Replicator.UpdateResponse[GCounter]] = ctx.messageAdapter(InternalUpdateResponse.apply) @@ -68,7 +65,7 @@ object ReplicatorSpec { Behaviors.receive[ClientCommand] { (ctx, msg) ⇒ msg match { case Increment ⇒ - replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal, updateResponseAdapter)(_ + 1) + replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal, updateResponseAdapter)(_ :+ 1) Behaviors.same case GetValue(replyTo) ⇒ @@ -131,7 +128,7 @@ class ReplicatorSpec extends ScalaTestWithActorTestKit(ReplicatorSpec.config) wi implicit val testSettings = TestKitSettings(system) val settings = ReplicatorSettings(system) - implicit val cluster = Cluster(system.toUntyped) + implicit val selfNodeAddress = DistributedData(system).selfUniqueAddress "Replicator" must { @@ -172,6 +169,10 @@ class ReplicatorSpec extends ScalaTestWithActorTestKit(ReplicatorSpec.config) wi c ! GetValue(probe.ref) probe.expectMessage(1) } + + "have the prefixed replicator name" in { + ReplicatorSettings.name(system) should ===("typedDdataReplicator") + } } } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DistributedData.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DistributedData.scala index 76132fbb3c..782f191cc8 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DistributedData.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DistributedData.scala @@ -10,7 +10,7 @@ import akka.actor.ExtendedActorSystem import akka.actor.Extension import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider -import akka.cluster.Cluster +import akka.cluster.{ Cluster, UniqueAddress } object DistributedData extends ExtensionId[DistributedData] with ExtensionIdProvider { override def get(system: ActorSystem): DistributedData = super.get(system) @@ -28,14 +28,9 @@ object DistributedData extends ExtensionId[DistributedData] with ExtensionIdProv */ class DistributedData(system: ExtendedActorSystem) extends Extension { - private val config = system.settings.config.getConfig("akka.cluster.distributed-data") - private val settings = ReplicatorSettings(config) + private val settings = ReplicatorSettings(system) - /** - * Returns true if this member is not tagged with the role configured for the - * replicas. - */ - def isTerminated: Boolean = Cluster(system).isTerminated || !settings.roles.subsetOf(Cluster(system).selfRoles) + implicit val selfUniqueAddress: SelfUniqueAddress = SelfUniqueAddress(Cluster(system).selfUniqueAddress) /** * `ActorRef` of the [[Replicator]] . @@ -45,7 +40,20 @@ class DistributedData(system: ExtendedActorSystem) extends Extension { system.log.warning("Replicator points to dead letters: Make sure the cluster node is not terminated and has the proper role!") system.deadLetters } else { - val name = config.getString("name") - system.systemActorOf(Replicator.props(settings), name) + system.systemActorOf(Replicator.props(settings), ReplicatorSettings.name(system, None)) } + + /** + * Returns true if this member is not tagged with the role configured for the + * replicas. + */ + def isTerminated: Boolean = + Cluster(system).isTerminated || !settings.roles.subsetOf(Cluster(system).selfRoles) + } + +/** + * Cluster non-specific (typed vs untyped) wrapper for [[akka.cluster.UniqueAddress]]. + */ +@SerialVersionUID(1L) +final case class SelfUniqueAddress(uniqueAddress: UniqueAddress) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala index 7b0f9ab83c..c47e54c4bd 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala @@ -65,14 +65,19 @@ final class GCounter private[akka] ( * Increment the counter with the delta `n` specified. * The delta must be zero or positive. */ - def +(n: Long)(implicit node: Cluster): GCounter = increment(node, n) + def :+(n: Long)(implicit node: SelfUniqueAddress): GCounter = increment(node.uniqueAddress, n) + + @deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def +(n: Long)(implicit node: Cluster): GCounter = increment(node.selfUniqueAddress, n) /** * Increment the counter with the delta `n` specified. * The delta `n` must be zero or positive. */ - def increment(node: Cluster, n: Long = 1): GCounter = - increment(node.selfUniqueAddress, n) + def increment(node: SelfUniqueAddress, n: Long): GCounter = increment(node.uniqueAddress, n) + + @deprecated("Use `increment` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def increment(node: Cluster, n: Long = 1): GCounter = increment(node.selfUniqueAddress, n) /** * INTERNAL API diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala index 0c90f2c18a..75d3cec768 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala @@ -4,9 +4,9 @@ package akka.cluster.ddata +import akka.annotation.InternalApi import akka.cluster.Cluster import akka.cluster.UniqueAddress -import akka.annotation.InternalApi import akka.cluster.ddata.ORMap.ZeroTag object LWWMap { @@ -87,6 +87,12 @@ final class LWWMap[A, B] private[akka] ( /** * Adds an entry to the map */ + def :+(entry: (A, B))(implicit node: SelfUniqueAddress): LWWMap[A, B] = { + val (key, value) = entry + put(node, key, value) + } + + @deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def +(entry: (A, B))(implicit node: Cluster): LWWMap[A, B] = { val (key, value) = entry put(node, key, value) @@ -95,8 +101,12 @@ final class LWWMap[A, B] private[akka] ( /** * Adds an entry to the map */ + def put(node: SelfUniqueAddress, key: A, value: B): LWWMap[A, B] = + put(node.uniqueAddress, key, value, defaultClock[B]) + + @deprecated("Use `put` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def put(node: Cluster, key: A, value: B): LWWMap[A, B] = - put(node, key, value, defaultClock[B]) + put(node.selfUniqueAddress, key, value, defaultClock[B]) /** * Adds an entry to the map. @@ -106,6 +116,10 @@ final class LWWMap[A, B] private[akka] ( * increasing version number from a database record that is used for optimistic * concurrency control. */ + def put(node: SelfUniqueAddress, key: A, value: B, clock: Clock[B]): LWWMap[A, B] = + put(node.uniqueAddress, key, value, clock) + + @deprecated("Use `put` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def put(node: Cluster, key: A, value: B, clock: Clock[B]): LWWMap[A, B] = put(node.selfUniqueAddress, key, value, clock) @@ -117,8 +131,9 @@ final class LWWMap[A, B] private[akka] ( * increasing version number from a database record that is used for optimistic * concurrency control. */ + @deprecated("Use `put` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def put(key: A, value: B)(implicit node: Cluster, clock: Clock[B] = defaultClock[B]): LWWMap[A, B] = - put(node, key, value, clock) + put(node.selfUniqueAddress, key, value, clock) /** * INTERNAL API @@ -136,6 +151,7 @@ final class LWWMap[A, B] private[akka] ( * Note that if there is a conflicting update on another node the entry will * not be removed after merge. */ + @deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def -(key: A)(implicit node: Cluster): LWWMap[A, B] = remove(node, key) /** @@ -143,6 +159,10 @@ final class LWWMap[A, B] private[akka] ( * Note that if there is a conflicting update on another node the entry will * not be removed after merge. */ + def remove(node: SelfUniqueAddress, key: A): LWWMap[A, B] = + remove(node.uniqueAddress, key) + + @deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def remove(node: Cluster, key: A): LWWMap[A, B] = remove(node.selfUniqueAddress, key) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWRegister.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWRegister.scala index 72538d4a73..6c72ddd80b 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWRegister.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWRegister.scala @@ -4,10 +4,10 @@ package akka.cluster.ddata +import akka.annotation.InternalApi import akka.cluster.Cluster import akka.cluster.UniqueAddress import akka.util.HashCode -import akka.annotation.InternalApi object LWWRegister { @@ -48,20 +48,48 @@ object LWWRegister { @InternalApi private[akka] def apply[A](node: UniqueAddress, initialValue: A, clock: Clock[A]): LWWRegister[A] = new LWWRegister(node, initialValue, clock(0L, initialValue)) + def apply[A](node: SelfUniqueAddress, initialValue: A): LWWRegister[A] = + apply(node.uniqueAddress, initialValue, defaultClock[A]) + + def apply[A](node: SelfUniqueAddress, initialValue: A, clock: Clock[A]): LWWRegister[A] = + apply(node.uniqueAddress, initialValue, clock) + + @deprecated("Use `apply` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def apply[A](initialValue: A)(implicit node: Cluster, clock: Clock[A] = defaultClock[A]): LWWRegister[A] = apply(node.selfUniqueAddress, initialValue, clock) + /** + * Scala API + * Creates a `LWWRegister` with implicits, given deprecated `apply` functions using Cluster constrain overloading. + */ + def create[A](initialValue: A)(implicit node: SelfUniqueAddress, clock: Clock[A] = defaultClock[A]): LWWRegister[A] = + apply(node.uniqueAddress, initialValue, clock) + /** * Java API */ + @deprecated("Use `create` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def create[A](node: Cluster, initialValue: A): LWWRegister[A] = apply(initialValue)(node) /** * Java API */ + @deprecated("Use `create` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def create[A](node: Cluster, initialValue: A, clock: Clock[A]): LWWRegister[A] = - apply(initialValue)(node, clock) + apply(node.selfUniqueAddress, initialValue, clock) + + /** + * Java API + */ + def create[A](node: SelfUniqueAddress, initialValue: A, clock: Clock[A]): LWWRegister[A] = + apply(node.uniqueAddress, initialValue, clock) + + /** + * Java API + */ + def create[A](node: SelfUniqueAddress, initialValue: A): LWWRegister[A] = + apply(node.uniqueAddress, initialValue, defaultClock[A]) /** * Extract the [[LWWRegister#value]]. @@ -122,13 +150,13 @@ final class LWWRegister[A] private[akka] ( * increasing version number from a database record that is used for optimistic * concurrency control. */ - def withValue(value: A)(implicit node: Cluster, clock: Clock[A] = defaultClock[A]): LWWRegister[A] = - withValue(node, value, clock) + def withValue(node: SelfUniqueAddress, value: A, clock: Clock[A]): LWWRegister[A] = + withValue(node.uniqueAddress, value, clock) /** * Change the value of the register. */ - def withValue(node: Cluster, value: A): LWWRegister[A] = + def withValue(node: SelfUniqueAddress, value: A): LWWRegister[A] = withValue(node, value, defaultClock[A]) /** @@ -139,6 +167,18 @@ final class LWWRegister[A] private[akka] ( * increasing version number from a database record that is used for optimistic * concurrency control. */ + def withValueOf(value: A)(implicit node: SelfUniqueAddress, clock: Clock[A] = defaultClock[A]): LWWRegister[A] = + withValue(node, value, clock) + + @deprecated("Use `withValueOf` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def withValue(value: A)(implicit node: Cluster, clock: Clock[A] = defaultClock[A]): LWWRegister[A] = + withValue(node, value, clock) + + @deprecated("Use `withValue` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def withValue(node: Cluster, value: A): LWWRegister[A] = + withValue(node, value, defaultClock[A]) + + @deprecated("Use `withValue` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def withValue(node: Cluster, value: A, clock: Clock[A]): LWWRegister[A] = withValue(node.selfUniqueAddress, value, clock) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala index e9213d744b..57506caf51 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala @@ -199,9 +199,15 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( * Adds an entry to the map * @see [[#put]] */ + def :+(entry: (A, B))(implicit node: SelfUniqueAddress): ORMap[A, B] = { + val (key, value) = entry + put(node.uniqueAddress, key, value) + } + + @deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def +(entry: (A, B))(implicit node: Cluster): ORMap[A, B] = { val (key, value) = entry - put(node, key, value) + put(node.selfUniqueAddress, key, value) } /** @@ -217,6 +223,9 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( * value, because important history can be lost when replacing the `ORSet` and * undesired effects of merging will occur. Use [[ORMultiMap]] or [[#updated]] instead. */ + def put(node: SelfUniqueAddress, key: A, value: B): ORMap[A, B] = put(node.uniqueAddress, key, value) + + @deprecated("Use `put` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def put(node: Cluster, key: A, value: B): ORMap[A, B] = put(node.selfUniqueAddress, key, value) /** @@ -241,6 +250,10 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( * If there is no current value for the `key` the `initial` value will be * passed to the `modify` function. */ + def updated(node: SelfUniqueAddress, key: A, initial: B)(modify: B ⇒ B): ORMap[A, B] = + updated(node.uniqueAddress, key, initial)(modify) + + @deprecated("Use `updated` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def updated(node: Cluster, key: A, initial: B)(modify: B ⇒ B): ORMap[A, B] = updated(node.selfUniqueAddress, key, initial)(modify) @@ -251,9 +264,9 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( * passed to the `modify` function. */ @Deprecated - @deprecated("use update for the Java API as updated is ambiguous with the Scala API", "2.5.19") + @deprecated("use update for the Java API as updated is ambiguous with the Scala API", "2.5.20") def updated(node: Cluster, key: A, initial: B, modify: java.util.function.Function[B, B]): ORMap[A, B] = - updated(node, key, initial)(value ⇒ modify.apply(value)) + updated(node.selfUniqueAddress, key, initial)(value ⇒ modify.apply(value)) /** * Java API: Replace a value by applying the `modify` function on the existing value. @@ -261,6 +274,11 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( * If there is no current value for the `key` the `initial` value will be * passed to the `modify` function. */ + def update(node: SelfUniqueAddress, key: A, initial: B, modify: java.util.function.Function[B, B]): ORMap[A, B] = + updated(node.uniqueAddress, key, initial)(value ⇒ modify.apply(value)) + + @Deprecated + @deprecated("Use `update` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def update(node: Cluster, key: A, initial: B, modify: java.util.function.Function[B, B]): ORMap[A, B] = updated(node, key, initial)(value ⇒ modify.apply(value)) @@ -295,17 +313,25 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( } /** + * Scala API * Removes an entry from the map. * Note that if there is a conflicting update on another node the entry will * not be removed after merge. */ - def -(key: A)(implicit node: Cluster): ORMap[A, B] = remove(node, key) + def remove(key: A)(implicit node: SelfUniqueAddress): ORMap[A, B] = remove(node.uniqueAddress, key) /** + * Java API * Removes an entry from the map. * Note that if there is a conflicting update on another node the entry will * not be removed after merge. */ + def remove(node: SelfUniqueAddress, key: A): ORMap[A, B] = remove(node.uniqueAddress, key) + + @deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def -(key: A)(implicit node: Cluster): ORMap[A, B] = remove(node.selfUniqueAddress, key) + + @deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def remove(node: Cluster, key: A): ORMap[A, B] = remove(node.selfUniqueAddress, key) /** diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala index b7c69b262d..a84664c2e0 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala @@ -125,18 +125,28 @@ final class ORMultiMap[A, B] private[akka] ( def size: Int = underlying.keys.elements.size /** - * Convenience for put. Requires an implicit Cluster. + * Convenience for put. Requires an implicit SelfUniqueAddress. * @see [[#put]] */ + def :+(entry: (A, Set[B]))(implicit node: SelfUniqueAddress): ORMultiMap[A, B] = { + val (key, value) = entry + put(node.uniqueAddress, key, value) + } + + @deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def +(entry: (A, Set[B]))(implicit node: Cluster): ORMultiMap[A, B] = { val (key, value) = entry - put(node, key, value) + put(node.selfUniqueAddress, key, value) } /** * Scala API: Associate an entire set with the key while retaining the history of the previous * replicated data set. */ + def put(node: SelfUniqueAddress, key: A, value: Set[B]): ORMultiMap[A, B] = + put(node.uniqueAddress, key, value) + + @deprecated("Use `put` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def put(node: Cluster, key: A, value: Set[B]): ORMultiMap[A, B] = put(node.selfUniqueAddress, key, value) @@ -144,9 +154,16 @@ final class ORMultiMap[A, B] private[akka] ( * Java API: Associate an entire set with the key while retaining the history of the previous * replicated data set. */ + def put(node: SelfUniqueAddress, key: A, value: java.util.Set[B]): ORMultiMap[A, B] = { + import scala.collection.JavaConverters._ + put(node.uniqueAddress, key, value.asScala.toSet) + } + + @Deprecated + @deprecated("Use `put` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def put(node: Cluster, key: A, value: java.util.Set[B]): ORMultiMap[A, B] = { import scala.collection.JavaConverters._ - put(node, key, value.asScala.toSet) + put(node.selfUniqueAddress, key, value.asScala.toSet) } /** @@ -159,18 +176,27 @@ final class ORMultiMap[A, B] private[akka] ( new ORMultiMap(newUnderlying, withValueDeltas) } + /** + * Scala API + * Remove an entire set associated with the key. + */ + def remove(key: A)(implicit node: SelfUniqueAddress): ORMultiMap[A, B] = remove(node.uniqueAddress, key) + /** * Convenience for remove. Requires an implicit Cluster. * @see [[#remove]] */ - def -(key: A)(implicit node: Cluster): ORMultiMap[A, B] = - remove(node, key) + @deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def -(key: A)(implicit node: Cluster): ORMultiMap[A, B] = remove(node.selfUniqueAddress, key) /** + * Java API * Remove an entire set associated with the key. */ - def remove(node: Cluster, key: A): ORMultiMap[A, B] = - remove(node.selfUniqueAddress, key) + def remove(node: SelfUniqueAddress, key: A): ORMultiMap[A, B] = remove(node.uniqueAddress, key) + + @deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def remove(node: Cluster, key: A): ORMultiMap[A, B] = remove(node.selfUniqueAddress, key) /** * INTERNAL API @@ -185,16 +211,22 @@ final class ORMultiMap[A, B] private[akka] ( } /** - * Scala API: Add an element to a set associated with a key. If there is no existing set then one will be initialised. + * Add an element to a set associated with a key. If there is no existing set then one will be initialised. + * TODO add implicit after deprecated is EOL. */ + def addBinding(node: SelfUniqueAddress, key: A, element: B): ORMultiMap[A, B] = + addBinding(node.uniqueAddress, key, element) + + def addBindingBy(key: A, element: B)(implicit node: SelfUniqueAddress): ORMultiMap[A, B] = + addBinding(node, key, element) + + @deprecated("Use `addBinding` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def addBinding(key: A, element: B)(implicit node: Cluster): ORMultiMap[A, B] = addBinding(node.selfUniqueAddress, key, element) - /** - * Java API: Add an element to a set associated with a key. If there is no existing set then one will be initialised. - */ + @deprecated("Use `addBinding` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def addBinding(node: Cluster, key: A, element: B): ORMultiMap[A, B] = - addBinding(key, element)(node) + addBinding(node.selfUniqueAddress, key, element) /** * INTERNAL API @@ -205,18 +237,24 @@ final class ORMultiMap[A, B] private[akka] ( } /** - * Scala API: Remove an element of a set associated with a key. If there are no more elements in the set then the + * Remove an element of a set associated with a key. If there are no more elements in the set then the * entire set will be removed. + * TODO add implicit after deprecated is EOL. */ + def removeBinding(node: SelfUniqueAddress, key: A, element: B): ORMultiMap[A, B] = + removeBinding(node.uniqueAddress, key, element) + + def removeBindingBy(key: A, element: B)(implicit node: SelfUniqueAddress): ORMultiMap[A, B] = + removeBinding(node, key, element) + + @deprecated("Use `removeBinding` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def removeBinding(key: A, element: B)(implicit node: Cluster): ORMultiMap[A, B] = removeBinding(node.selfUniqueAddress, key, element) - /** - * Java API: Remove an element of a set associated with a key. If there are no more elements in the set then the - * entire set will be removed. - */ + @Deprecated + @deprecated("Use `removeBinding` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def removeBinding(node: Cluster, key: A, element: B): ORMultiMap[A, B] = - removeBinding(key, element)(node) + removeBinding(node.selfUniqueAddress, key, element) /** * INTERNAL API @@ -241,6 +279,13 @@ final class ORMultiMap[A, B] private[akka] ( * and another one is added within the same Update. The order of addition and removal is important in order * to retain history for replicated data. */ + def replaceBinding(node: SelfUniqueAddress, key: A, oldElement: B, newElement: B): ORMultiMap[A, B] = + replaceBinding(node.uniqueAddress, key, oldElement, newElement) + + def replaceBindingBy(key: A, oldElement: B, newElement: B)(implicit node: SelfUniqueAddress): ORMultiMap[A, B] = + replaceBinding(node, key, oldElement, newElement) + + @deprecated("Use `replaceBinding` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def replaceBinding(key: A, oldElement: B, newElement: B)(implicit node: Cluster): ORMultiMap[A, B] = replaceBinding(node.selfUniqueAddress, key, oldElement, newElement) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala index 3ea24e8b52..88916ad954 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala @@ -307,14 +307,17 @@ final class ORSet[A] private[akka] ( def size: Int = elementsMap.size - /** - * Adds an element to the set - */ - def +(element: A)(implicit node: Cluster): ORSet[A] = add(node, element) + /** Adds an element to the set. */ + def :+(element: A)(implicit node: SelfUniqueAddress): ORSet[A] = add(node, element) - /** - * Adds an element to the set - */ + @deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def +(element: A)(implicit node: Cluster): ORSet[A] = add(node.selfUniqueAddress, element) + + /** Adds an element to the set. */ + def add(node: SelfUniqueAddress, element: A): ORSet[A] = add(node.uniqueAddress, element) + + @Deprecated + @deprecated("Use `add` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def add(node: Cluster, element: A): ORSet[A] = add(node.selfUniqueAddress, element) /** @@ -335,13 +338,27 @@ final class ORSet[A] private[akka] ( } /** + * Scala API * Removes an element from the set. */ - def -(element: A)(implicit node: Cluster): ORSet[A] = remove(node, element) + def remove(element: A)(implicit node: SelfUniqueAddress): ORSet[A] = remove(node.uniqueAddress, element) + + /** + * Java API + * Removes an element from the set. + */ + def remove(node: SelfUniqueAddress, element: A): ORSet[A] = remove(node.uniqueAddress, element) /** * Removes an element from the set. */ + @deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def -(element: A)(implicit node: Cluster): ORSet[A] = remove(node.selfUniqueAddress, element) + + /** + * Removes an element from the set. + */ + @deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def remove(node: Cluster, element: A): ORSet[A] = remove(node.selfUniqueAddress, element) /** @@ -362,6 +379,9 @@ final class ORSet[A] private[akka] ( * This has the same result as using [[#remove]] for each * element, but it is more efficient. */ + def clear(node: SelfUniqueAddress): ORSet[A] = clear(node.uniqueAddress) + + @deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def clear(node: Cluster): ORSet[A] = clear(node.selfUniqueAddress) /** diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala index 469c6a036f..3d2127e47f 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounter.scala @@ -62,67 +62,98 @@ final class PNCounter private[akka] ( * Increment the counter with the delta `n` specified. * If the delta is negative then it will decrement instead of increment. */ - def +(n: Long)(implicit node: Cluster): PNCounter = increment(node, n) + def :+(n: Long)(implicit node: SelfUniqueAddress): PNCounter = increment(node.uniqueAddress, n) + + @deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def +(n: Long)(implicit node: Cluster): PNCounter = increment(node.selfUniqueAddress, n) /** * Increment the counter with the delta `n` specified. * If the delta is negative then it will decrement instead of increment. */ - def +(n: BigInt)(implicit node: Cluster): PNCounter = increment(node, n) + def :+(n: BigInt)(implicit node: SelfUniqueAddress): PNCounter = increment(node.uniqueAddress, n) - /** - * Increment the counter with the delta `n` specified. - * If the delta is negative then it will decrement instead of increment. - */ - def increment(node: Cluster, n: Long = 1): PNCounter = - increment(node.selfUniqueAddress, n) + @deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def +(n: BigInt)(implicit node: Cluster): PNCounter = increment(node.selfUniqueAddress, n) /** * Scala API: Increment the counter with the delta `n` specified. * If the delta is negative then it will decrement instead of increment. */ - def increment(node: Cluster, n: BigInt): PNCounter = - increment(node.selfUniqueAddress, n) + def increment(n: Long)(implicit node: SelfUniqueAddress): PNCounter = increment(node.uniqueAddress, n) + + @deprecated("Use `increment` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def increment(node: Cluster, n: Long = 1): PNCounter = increment(node.selfUniqueAddress, n) + + /** + * Increment the counter with the delta `n` specified. + * If the delta is negative then it will decrement instead of increment. + */ + def increment(n: BigInt)(implicit node: SelfUniqueAddress): PNCounter = increment(node.uniqueAddress, n) + + @deprecated("Use `increment` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def increment(node: Cluster, n: BigInt): PNCounter = increment(node.selfUniqueAddress, n) /** * Java API: Increment the counter with the delta `n` specified. * If the delta is negative then it will decrement instead of increment. */ - def increment(node: Cluster, n: java.math.BigInteger): PNCounter = - increment(node.selfUniqueAddress, n) + def increment(node: SelfUniqueAddress, n: java.math.BigInteger): PNCounter = increment(node.uniqueAddress, n) + + /** + * Java API: Increment the counter with the delta `n` specified. + * If the delta is negative then it will decrement instead of increment. + */ + def increment(node: SelfUniqueAddress, n: Long): PNCounter = increment(node.uniqueAddress, n) + + @deprecated("Use `increment` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def increment(node: Cluster, n: java.math.BigInteger): PNCounter = increment(node.selfUniqueAddress, n) /** * Decrement the counter with the delta `n` specified. * If the delta is negative then it will increment instead of decrement. */ - def -(n: Long)(implicit node: Cluster): PNCounter = decrement(node, n) + def decrement(n: Long)(implicit node: SelfUniqueAddress): PNCounter = decrement(node.uniqueAddress, n) + + @deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def -(n: Long)(implicit node: Cluster): PNCounter = decrement(node.selfUniqueAddress, n) /** * Decrement the counter with the delta `n` specified. * If the delta is negative then it will increment instead of decrement. */ - def -(n: BigInt)(implicit node: Cluster): PNCounter = decrement(node, n) + def decrement(n: BigInt)(implicit node: SelfUniqueAddress): PNCounter = decrement(node.uniqueAddress, n) + + @deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def -(n: BigInt)(implicit node: Cluster): PNCounter = decrement(node.selfUniqueAddress, n) /** * Decrement the counter with the delta `n` specified. * If the delta `n` is negative then it will increment instead of decrement. */ - def decrement(node: Cluster, n: Long = 1): PNCounter = - decrement(node.selfUniqueAddress, n) + def decrement(node: SelfUniqueAddress, n: Long): PNCounter = decrement(node.uniqueAddress, n) + + @deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def decrement(node: Cluster, n: Long = 1): PNCounter = decrement(node.selfUniqueAddress, n) /** * Scala API: Decrement the counter with the delta `n` specified. * If the delta `n` is negative then it will increment instead of decrement. */ - def decrement(node: Cluster, n: BigInt): PNCounter = - decrement(node.selfUniqueAddress, n) + def decrement(node: SelfUniqueAddress, n: BigInt): PNCounter = decrement(node.uniqueAddress, n) + + @deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def decrement(node: Cluster, n: BigInt): PNCounter = decrement(node.selfUniqueAddress, n) /** * Java API: Decrement the counter with the delta `n` specified. * If the delta `n` is negative then it will increment instead of decrement. */ - def decrement(node: Cluster, n: java.math.BigInteger): PNCounter = - decrement(node.selfUniqueAddress, n) + def decrement(node: SelfUniqueAddress, n: java.math.BigInteger): PNCounter = decrement(node.uniqueAddress, n) + + @Deprecated + @deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def decrement(node: Cluster, n: java.math.BigInteger): PNCounter = decrement(node.selfUniqueAddress, n) /** Internal API */ @InternalApi private[akka] def increment(key: UniqueAddress, n: BigInt): PNCounter = change(key, n) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala index ae58ba6616..703a5611d7 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala @@ -4,11 +4,11 @@ package akka.cluster.ddata -import akka.cluster.Cluster -import akka.cluster.UniqueAddress import java.math.BigInteger import akka.annotation.InternalApi +import akka.cluster.Cluster +import akka.cluster.UniqueAddress import akka.cluster.ddata.ORMap._ object PNCounterMap { @@ -75,13 +75,24 @@ final class PNCounterMap[A] private[akka] ( * Increment the counter with the delta specified. * If the delta is negative then it will decrement instead of increment. */ - def increment(key: A, delta: Long = 1)(implicit node: Cluster): PNCounterMap[A] = - increment(node, key, delta) + def incrementBy(key: A, delta: Long)(implicit node: SelfUniqueAddress): PNCounterMap[A] = + increment(node.uniqueAddress, key, delta) /** * Increment the counter with the delta specified. * If the delta is negative then it will decrement instead of increment. */ + def increment(key: A, delta: Long = 1)(implicit node: Cluster): PNCounterMap[A] = + increment(node.selfUniqueAddress, key, delta) + + /** + * Increment the counter with the delta specified. + * If the delta is negative then it will decrement instead of increment. + */ + def increment(node: SelfUniqueAddress, key: A, delta: Long): PNCounterMap[A] = + increment(node.uniqueAddress, key, delta) + + @deprecated("Use `increment` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def increment(node: Cluster, key: A, delta: Long): PNCounterMap[A] = increment(node.selfUniqueAddress, key, delta) @@ -94,14 +105,28 @@ final class PNCounterMap[A] private[akka] ( /** * Decrement the counter with the delta specified. * If the delta is negative then it will increment instead of decrement. + * TODO add implicit after deprecated is EOL. */ - def decrement(key: A, delta: Long = 1)(implicit node: Cluster): PNCounterMap[A] = + def decrementBy(key: A, delta: Long = 1)(implicit node: SelfUniqueAddress): PNCounterMap[A] = decrement(node, key, delta) /** * Decrement the counter with the delta specified. * If the delta is negative then it will increment instead of decrement. + * TODO add implicit after deprecated is EOL. */ + def decrement(node: SelfUniqueAddress, key: A, delta: Long): PNCounterMap[A] = + decrement(node.uniqueAddress, key, delta) + + @deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def decrement(key: A, delta: Long = 1)(implicit node: Cluster): PNCounterMap[A] = + decrement(node.selfUniqueAddress, key, delta) + + /** + * Decrement the counter with the delta specified. + * If the delta is negative then it will increment instead of decrement. + */ + @deprecated("Use `decrement` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def decrement(node: Cluster, key: A, delta: Long): PNCounterMap[A] = decrement(node.selfUniqueAddress, key, delta) @@ -117,16 +142,16 @@ final class PNCounterMap[A] private[akka] ( * Note that if there is a conflicting update on another node the entry will * not be removed after merge. */ - def -(key: A)(implicit node: Cluster): PNCounterMap[A] = remove(node, key) + def remove(key: A)(implicit node: SelfUniqueAddress): PNCounterMap[A] = + remove(node.uniqueAddress, key) - /** - * Removes an entry from the map. - * Note that if there is a conflicting update on another node the entry will - * not be removed after merge. - */ + @deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def remove(node: Cluster, key: A): PNCounterMap[A] = remove(node.selfUniqueAddress, key) + @deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def -(key: A)(implicit node: Cluster): PNCounterMap[A] = remove(node, key) + /** * INTERNAL API */ diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index aa7b3ee3f7..2f2856d264 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -100,6 +100,15 @@ object ReplicatorSettings { */ @InternalApi private[akka] def roleOption(role: String): Option[String] = if (role == "") None else Option(role) + + /** + * INTERNAL API + * The name of the actor used in DistributedData extensions. + */ + @InternalApi private[akka] def name(system: ActorSystem, modifier: Option[String]): String = { + val name = system.settings.config.getString("akka.cluster.distributed-data.name") + modifier.map(s ⇒ s + name.take(1).toUpperCase + name.drop(1)).getOrElse(name) + } } /** diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala index 070bdb7c46..bee7d3c2cd 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala @@ -9,7 +9,6 @@ import scala.annotation.tailrec import scala.collection.immutable.TreeMap import akka.cluster.Cluster import akka.cluster.UniqueAddress -import akka.cluster.UniqueAddress import akka.annotation.InternalApi /** @@ -107,7 +106,10 @@ sealed abstract class VersionVector /** * Increment the version for the node passed as argument. Returns a new VersionVector. */ - def +(node: Cluster): VersionVector = increment(node) + def :+(node: SelfUniqueAddress): VersionVector = increment(node) + + @deprecated("Use `:+` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") + def +(node: Cluster): VersionVector = increment(node.selfUniqueAddress) /** * INTERNAL API @@ -118,6 +120,9 @@ sealed abstract class VersionVector /** * Increment the version for the node passed as argument. Returns a new VersionVector. */ + def increment(node: SelfUniqueAddress): VersionVector = increment(node.uniqueAddress) + + @deprecated("Use `increment` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") def increment(node: Cluster): VersionVector = increment(node.selfUniqueAddress) def isEmpty: Boolean diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala index e8282bbde0..33387f240b 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala @@ -50,7 +50,7 @@ object DurableDataSpec { else sender() ! LoadAllCompleted - case Store(key, data, reply) ⇒ + case Store(_, _, reply) ⇒ if (failStore) reply match { case Some(StoreReply(_, failureMsg, replyTo)) ⇒ replyTo ! failureMsg case None ⇒ @@ -79,8 +79,8 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) override def initialParticipants = roles.size - implicit val cluster = Cluster(system) - + val cluster = Cluster(system) + implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress val timeout = 14.seconds.dilated // initialization of lmdb can be very slow in CI environment val writeTwo = WriteTo(2, timeout) val readTwo = ReadFrom(2, timeout) @@ -123,9 +123,9 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) r ! Get(KeyA, ReadLocal) expectMsg(NotFound(KeyA, None)) - r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1) - r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1) - r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1) + r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1) + r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1) + r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1) expectMsg(UpdateSuccess(KeyA, None)) expectMsg(UpdateSuccess(KeyA, None)) @@ -163,10 +163,10 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) } enterBarrier("both-initialized") - r ! Update(KeyA, GCounter(), writeTwo)(_ + 1) + r ! Update(KeyA, GCounter(), writeTwo)(_ :+ 1) expectMsg(UpdateSuccess(KeyA, None)) - r ! Update(KeyC, ORSet.empty[String], writeTwo)(_ + myself.name) + r ! Update(KeyC, ORSet.empty[String], writeTwo)(_ :+ myself.name) expectMsg(UpdateSuccess(KeyC, None)) enterBarrier("update-done-" + testStepCounter) @@ -203,7 +203,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) val r = newReplicator() runOn(first) { - r ! Update(KeyC, ORSet.empty[String], WriteLocal)(_ + myself.name) + r ! Update(KeyC, ORSet.empty[String], WriteLocal)(_ :+ myself.name) expectMsg(UpdateSuccess(KeyC, None)) } @@ -213,7 +213,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) // must do one more roundtrip to be sure that it keyB is stored, since Changed might have // been sent out before storage - r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1) + r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1) expectMsg(UpdateSuccess(KeyA, None)) watch(r) @@ -254,10 +254,10 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) r ! Get(KeyA, ReadLocal) expectMsg(NotFound(KeyA, None)) - r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1) - r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1) - r ! Update(KeyA, GCounter(), WriteLocal)(_ + 1) - r ! Update(KeyB, GCounter(), WriteLocal)(_ + 1) + r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1) + r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1) + r ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1) + r ! Update(KeyB, GCounter(), WriteLocal)(_ :+ 1) expectMsg(UpdateSuccess(KeyA, None)) expectMsg(UpdateSuccess(KeyA, None)) @@ -286,7 +286,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) val r2: ActorRef = newReplicator(sys2) // it should be possible to update while loading is in progress - r2 ! Update(KeyB, GCounter(), WriteLocal)(_ + 1) + r2 ! Update(KeyB, GCounter(), WriteLocal)(_ :+ 1) expectMsg(UpdateSuccess(KeyB, None)) // wait until all loaded @@ -325,7 +325,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) Replicator.props( ReplicatorSettings(system).withDurableStoreProps(testDurableStoreProps(failStore = true))), "replicator-" + testStepCounter) - r ! Update(KeyA, GCounter(), WriteLocal, request = Some("a"))(_ + 1) + r ! Update(KeyA, GCounter(), WriteLocal, request = Some("a"))(_ :+ 1) expectMsg(StoreFailure(KeyA, Some("a"))) } enterBarrierAfterTestStep() diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala index 30f9da6a90..33de60f48f 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala @@ -43,7 +43,8 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN override def initialParticipants = roles.size - implicit val cluster = Cluster(system) + val cluster = Cluster(system) + implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress val maxPruningDissemination = 3.seconds def startReplicator(sys: ActorSystem): ActorRef = @@ -90,7 +91,7 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN } } - replicator ! Update(KeyA, GCounter(), WriteLocal)(_ + 3) + replicator ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 3) expectMsg(UpdateSuccess(KeyA, None)) replicator2.tell(Update(KeyA, GCounter(), WriteLocal)(_.increment(cluster2, 2)), probe2.ref) diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/JepsenInspiredInsertSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/JepsenInspiredInsertSpec.scala index 03794ab708..2903a45ec4 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/JepsenInspiredInsertSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/JepsenInspiredInsertSpec.scala @@ -49,7 +49,8 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w override def initialParticipants = roles.size - implicit val cluster = Cluster(system) + val cluster = Cluster(system) + implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress val replicator = DistributedData(system).replicator val nodes = roles.drop(1) // controller not part of active nodes val nodeCount = nodes.size @@ -114,7 +115,7 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w val writeProbe = TestProbe() val writeAcks = myData.map { i ⇒ sleepDelay() - replicator.tell(Update(key, ORSet(), WriteLocal, Some(i))(_ + i), writeProbe.ref) + replicator.tell(Update(key, ORSet(), WriteLocal, Some(i))(_ :+ i), writeProbe.ref) writeProbe.receiveOne(3.seconds) } val successWriteAcks = writeAcks.collect { case success: UpdateSuccess[_] ⇒ success } @@ -147,7 +148,7 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w val writeProbe = TestProbe() val writeAcks = myData.map { i ⇒ sleepDelay() - replicator.tell(Update(key, ORSet(), writeMajority, Some(i))(_ + i), writeProbe.ref) + replicator.tell(Update(key, ORSet(), writeMajority, Some(i))(_ :+ i), writeProbe.ref) writeProbe.receiveOne(timeout + 1.second) } val successWriteAcks = writeAcks.collect { case success: UpdateSuccess[_] ⇒ success } @@ -163,7 +164,7 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w val readProbe = TestProbe() replicator.tell(Get(key, readMajority), readProbe.ref) val result = readProbe.expectMsgPF() { case g @ GetSuccess(`key`, _) ⇒ g.get(key) } - val survivors = result.elements.size + //val survivors = result.elements.size result.elements should be(expectedData) } @@ -191,7 +192,7 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w val writeProbe = TestProbe() val writeAcks = myData.map { i ⇒ sleepDelay() - replicator.tell(Update(key, ORSet(), WriteLocal, Some(i))(_ + i), writeProbe.ref) + replicator.tell(Update(key, ORSet(), WriteLocal, Some(i))(_ :+ i), writeProbe.ref) writeProbe.receiveOne(3.seconds) } val successWriteAcks = writeAcks.collect { case success: UpdateSuccess[_] ⇒ success } @@ -236,7 +237,7 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w val writeProbe = TestProbe() val writeAcks = myData.map { i ⇒ sleepDelay() - replicator.tell(Update(key, ORSet(), writeMajority, Some(i))(_ + i), writeProbe.ref) + replicator.tell(Update(key, ORSet(), writeMajority, Some(i))(_ :+ i), writeProbe.ref) writeProbe.receiveOne(timeout + 1.second) } val successWriteAcks = writeAcks.collect { case success: UpdateSuccess[_] ⇒ success } @@ -260,7 +261,7 @@ class JepsenInspiredInsertSpec extends MultiNodeSpec(JepsenInspiredInsertSpec) w val readProbe = TestProbe() replicator.tell(Get(key, readMajority), readProbe.ref) val result = readProbe.expectMsgPF() { case g @ GetSuccess(`key`, _) ⇒ g.get(key) } - val survivors = result.elements.size + //val survivors = result.elements.size result.elements should be(expectedData) } // but on the 3 node side, read from majority doesn't mean that we are guaranteed to see diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala index a789dad573..31a95a4e80 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala @@ -69,7 +69,8 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe override def initialParticipants = roles.size - implicit val cluster = Cluster(system) + val cluster = Cluster(system) + implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress val replicator = DistributedData(system).replicator val timeout = 3.seconds.dilated val factor = 1 // use 3 here for serious tuning @@ -156,7 +157,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe val n = 1000 * factor val expectedData = (0 until n).toSet repeat("ORSet Update WriteLocal", keys, n)({ (key, i, replyTo) ⇒ - replicator.tell(Update(key, ORSet(), WriteLocal)(_ + i), replyTo) + replicator.tell(Update(key, ORSet(), WriteLocal)(_ :+ i), replyTo) }, key ⇒ awaitReplicated(key, expectedData)) enterBarrier("after-1") @@ -164,7 +165,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe "be blazingly fast for ORSet Get ReadLocal" taggedAs PerformanceTest in { val keys = (1 to repeatCount).map(n ⇒ ORSetKey[Int]("A" + n)) - repeat("Get ReadLocal", keys, 100000 * factor) { (key, i, replyTo) ⇒ + repeat("Get ReadLocal", keys, 100000 * factor) { (key, _, replyTo) ⇒ replicator.tell(Get(key, ReadLocal), replyTo) } enterBarrier("after-2") @@ -175,7 +176,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe val n = 200 * factor val expected = Some((0 until n).toSet) repeat("ORSet Update WriteLocal + gossip", keys, n, expected) { (key, i, replyTo) ⇒ - replicator.tell(Update(key, ORSet(), WriteLocal)(_ + i), replyTo) + replicator.tell(Update(key, ORSet(), WriteLocal)(_ :+ i), replyTo) } enterBarrier("after-3") } @@ -185,7 +186,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe val n = 200 * factor val expected = Some((0 until n).toSet ++ (0 until n).map(-_).toSet) repeat("ORSet Update WriteLocal existing + gossip", keys, n, expected) { (key, i, replyTo) ⇒ - replicator.tell(Update(key, ORSet(), WriteLocal)(_ + (-i)), replyTo) + replicator.tell(Update(key, ORSet(), WriteLocal)(_ :+ (-i)), replyTo) } enterBarrier("after-4") } @@ -196,7 +197,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe val expected = Some((0 until n).toSet) val writeTwo = WriteTo(2, timeout) repeat("ORSet Update WriteTwo + gossip", keys, n, expected) { (key, i, replyTo) ⇒ - replicator.tell(Update(key, ORSet(), writeTwo)(_ + i), replyTo) + replicator.tell(Update(key, ORSet(), writeTwo)(_ :+ i), replyTo) } enterBarrier("after-5") } @@ -209,7 +210,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe val latch = TestLatch(n) val replyTo = system.actorOf(countDownProps(latch)) for (_ ← 0 until n) - replicator.tell(Update(key, GCounter(), WriteLocal)(_ + 1), replyTo) + replicator.tell(Update(key, GCounter(), WriteLocal)(_ :+ 1), replyTo) Await.ready(latch, 5.seconds + (1.second * factor)) enterBarrier("update-done-6") runOn(n1) { @@ -247,7 +248,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe val n = 300 * factor val writeMajority = WriteMajority(timeout) repeat("ORSet Update one-by-one deltas", keys, n, oneByOne = true) { (key, i, replyTo) ⇒ - replicator.tell(Update(key, ORSet(), writeMajority)(_ + i), replyTo) + replicator.tell(Update(key, ORSet(), writeMajority)(_ :+ i), replyTo) } enterBarrier("after-7") } @@ -257,7 +258,7 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe val n = 200 * factor val writeMajority = WriteMajority(timeout) repeat("ORSet Update deltas", keys, n, oneByOne = false) { (key, i, replyTo) ⇒ - replicator.tell(Update(key, ORSet(), writeMajority)(_ + i), replyTo) + replicator.tell(Update(key, ORSet(), writeMajority)(_ :+ i), replyTo) } enterBarrier("after-8") } diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala index fd50dcc739..2ca41a760f 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala @@ -43,7 +43,8 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult override def initialParticipants = roles.size - implicit val cluster = Cluster(system) + val cluster = Cluster(system) + implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress val replicator = system.actorOf(Replicator.props( ReplicatorSettings(system).withRole("backend").withGossipInterval(1.second)), "replicator") val timeout = 3.seconds.dilated @@ -104,18 +105,18 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult } runOn(first) { - (0 until 5).foreach { i ⇒ - replicator ! Update(KeyA, GCounter(), WriteLocal)(_ + 1) - replicator ! Update(KeyB, PNCounter(), WriteLocal)(_ - 1) - replicator ! Update(KeyC, GCounter(), WriteAll(timeout))(_ + 1) + for (_ ← 0 until 5) { + replicator ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 1) + replicator ! Update(KeyB, PNCounter(), WriteLocal)(_ decrement 1) + replicator ! Update(KeyC, GCounter(), WriteAll(timeout))(_ :+ 1) } receiveN(15).map(_.getClass).toSet should be(Set(classOf[UpdateSuccess[_]])) } runOn(second) { - replicator ! Update(KeyA, GCounter(), WriteLocal)(_ + 20) - replicator ! Update(KeyB, PNCounter(), WriteTo(2, timeout))(_ + 20) - replicator ! Update(KeyC, GCounter(), WriteAll(timeout))(_ + 20) + replicator ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 20) + replicator ! Update(KeyB, PNCounter(), WriteTo(2, timeout))(_ :+ 20) + replicator ! Update(KeyC, GCounter(), WriteAll(timeout))(_ :+ 20) receiveN(3).toSet should be(Set( UpdateSuccess(KeyA, None), UpdateSuccess(KeyB, None), UpdateSuccess(KeyC, None))) @@ -123,23 +124,23 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult replicator ! Update(KeyE, GSet(), WriteLocal)(_ + "e1" + "e2") expectMsg(UpdateSuccess(KeyE, None)) - replicator ! Update(KeyF, ORSet(), WriteLocal)(_ + "e1" + "e2") + replicator ! Update(KeyF, ORSet(), WriteLocal)(_ :+ "e1" :+ "e2") expectMsg(UpdateSuccess(KeyF, None)) } runOn(fourth) { - replicator ! Update(KeyD, GCounter(), WriteLocal)(_ + 40) + replicator ! Update(KeyD, GCounter(), WriteLocal)(_ :+ 40) expectMsg(UpdateSuccess(KeyD, None)) replicator ! Update(KeyE, GSet(), WriteLocal)(_ + "e2" + "e3") expectMsg(UpdateSuccess(KeyE, None)) - replicator ! Update(KeyF, ORSet(), WriteLocal)(_ + "e2" + "e3") + replicator ! Update(KeyF, ORSet(), WriteLocal)(_ :+ "e2" :+ "e3") expectMsg(UpdateSuccess(KeyF, None)) } runOn(fifth) { - replicator ! Update(KeyX, GCounter(), WriteTo(2, timeout))(_ + 50) + replicator ! Update(KeyX, GCounter(), WriteTo(2, timeout))(_ :+ 50) expectMsg(UpdateSuccess(KeyX, None)) replicator ! Delete(KeyX, WriteLocal) expectMsg(DeleteSuccess(KeyX, None)) @@ -168,22 +169,22 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult enterBarrier("split") runOn(first) { - replicator ! Update(KeyA, GCounter(), WriteTo(2, timeout))(_ + 1) + replicator ! Update(KeyA, GCounter(), WriteTo(2, timeout))(_ :+ 1) expectMsg(UpdateSuccess(KeyA, None)) } runOn(third) { - replicator ! Update(KeyA, GCounter(), WriteTo(2, timeout))(_ + 2) + replicator ! Update(KeyA, GCounter(), WriteTo(2, timeout))(_ :+ 2) expectMsg(UpdateSuccess(KeyA, None)) replicator ! Update(KeyE, GSet(), WriteTo(2, timeout))(_ + "e4") expectMsg(UpdateSuccess(KeyE, None)) - replicator ! Update(KeyF, ORSet(), WriteTo(2, timeout))(_ - "e2") + replicator ! Update(KeyF, ORSet(), WriteTo(2, timeout))(_ remove "e2") expectMsg(UpdateSuccess(KeyF, None)) } runOn(fourth) { - replicator ! Update(KeyD, GCounter(), WriteTo(2, timeout))(_ + 1) + replicator ! Update(KeyD, GCounter(), WriteTo(2, timeout))(_ :+ 1) expectMsg(UpdateSuccess(KeyD, None)) } enterBarrier("update-during-split") diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala index f869666640..ee3b083a1b 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala @@ -141,7 +141,8 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult override def initialParticipants = roles.size - implicit val cluster = Cluster(system) + val cluster = Cluster(system) + implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress val fullStateReplicator = system.actorOf(Replicator.props( ReplicatorSettings(system).withGossipInterval(1.second).withDeltaCrdtEnabled(false)), "fullStateReplicator") val deltaReplicator = { @@ -199,12 +200,12 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult runOn(first) { // by setting something for each key we don't have to worry about NotFound List(KeyA, KeyB, KeyC).foreach { key ⇒ - fullStateReplicator ! Update(key, PNCounter.empty, WriteLocal)(_ + 1) - deltaReplicator ! Update(key, PNCounter.empty, WriteLocal)(_ + 1) + fullStateReplicator ! Update(key, PNCounter.empty, WriteLocal)(_ :+ 1) + deltaReplicator ! Update(key, PNCounter.empty, WriteLocal)(_ :+ 1) } List(KeyD, KeyE, KeyF).foreach { key ⇒ - fullStateReplicator ! Update(key, ORSet.empty[String], WriteLocal)(_ + "a") - deltaReplicator ! Update(key, ORSet.empty[String], WriteLocal)(_ + "a") + fullStateReplicator ! Update(key, ORSet.empty[String], WriteLocal)(_ :+ "a") + deltaReplicator ! Update(key, ORSet.empty[String], WriteLocal)(_ :+ "a") } } enterBarrier("updated-1") @@ -232,8 +233,8 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult "work with write consistency" in { runOn(first) { val p1 = TestProbe() - fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A"), p1.ref) - deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A"), p1.ref) + fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ :+ "A"), p1.ref) + deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ :+ "A"), p1.ref) p1.expectMsgType[UpdateSuccess[_]] p1.expectMsgType[UpdateSuccess[_]] } @@ -248,9 +249,9 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult // retry with full state to sort it out runOn(first) { val p1 = TestProbe() - deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "B"), p1.ref) - deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "C"), p1.ref) - deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "D"), p1.ref) + deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ :+ "B"), p1.ref) + deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ :+ "C"), p1.ref) + deltaReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ :+ "D"), p1.ref) p1.expectMsgType[UpdateSuccess[_]] p1.expectMsgType[UpdateSuccess[_]] p1.expectMsgType[UpdateSuccess[_]] @@ -262,7 +263,7 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult // add same to the fullStateReplicator so they are in sync runOn(first) { val p1 = TestProbe() - fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ + "A" + "B" + "C" + "D"), p1.ref) + fullStateReplicator.tell(Update(KeyD, ORSet.empty[String], writeAll)(_ :+ "A" :+ "B" :+ "C" :+ "D"), p1.ref) p1.expectMsgType[UpdateSuccess[_]] } enterBarrier("write-3") @@ -366,22 +367,22 @@ class ReplicatorDeltaSpec extends MultiNodeSpec(ReplicatorDeltaSpec) with STMult op match { case Delay(d) ⇒ Thread.sleep(d) case Incr(key, n, consistency) ⇒ - fullStateReplicator ! Update(key, PNCounter.empty, consistency)(_ + n) - deltaReplicator ! Update(key, PNCounter.empty, consistency)(_ + n) + fullStateReplicator ! Update(key, PNCounter.empty, consistency)(_ :+ n) + deltaReplicator ! Update(key, PNCounter.empty, consistency)(_ :+ n) case Decr(key, n, consistency) ⇒ - fullStateReplicator ! Update(key, PNCounter.empty, consistency)(_ - n) - deltaReplicator ! Update(key, PNCounter.empty, consistency)(_ - n) + fullStateReplicator ! Update(key, PNCounter.empty, consistency)(_ decrement n) + deltaReplicator ! Update(key, PNCounter.empty, consistency)(_ decrement n) case Add(key, elem, consistency) ⇒ // to have an deterministic result when mixing add/remove we can only perform // the ORSet operations from one node runOn((if (key == KeyF) List(first) else List(first, second, third)): _*) { - fullStateReplicator ! Update(key, ORSet.empty[String], consistency)(_ + elem) - deltaReplicator ! Update(key, ORSet.empty[String], consistency)(_ + elem) + fullStateReplicator ! Update(key, ORSet.empty[String], consistency)(_ :+ elem) + deltaReplicator ! Update(key, ORSet.empty[String], consistency)(_ :+ elem) } case Remove(key, elem, consistency) ⇒ runOn(first) { - fullStateReplicator ! Update(key, ORSet.empty[String], consistency)(_ - elem) - deltaReplicator ! Update(key, ORSet.empty[String], consistency)(_ - elem) + fullStateReplicator ! Update(key, ORSet.empty[String], consistency)(_ remove elem) + deltaReplicator ! Update(key, ORSet.empty[String], consistency)(_ remove elem) } } } diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala index 677d7d208e..febf7eebb4 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala @@ -164,11 +164,11 @@ object ReplicatorMapDeltaSpec extends MultiNodeConfig { }.toVector } - def addElementToORMap(om: ORMap[String, ORSet[String]], key: String, element: String)(implicit node: Cluster) = - om.updated(node, key, ORSet.empty[String])(_.add(node, element)) + def addElementToORMap(om: ORMap[String, ORSet[String]], key: String, element: String)(implicit node: SelfUniqueAddress) = + om.updated(node, key, ORSet.empty[String])(_ :+ element) - def removeElementFromORMap(om: ORMap[String, ORSet[String]], key: String, element: String)(implicit node: Cluster) = - om.updated(node, key, ORSet.empty[String])(_.remove(node, element)) + def removeElementFromORMap(om: ORMap[String, ORSet[String]], key: String, element: String)(implicit node: SelfUniqueAddress) = + om.updated(node, key, ORSet.empty[String])(_.remove(element)) } class ReplicatorMapDeltaSpecMultiJvmNode1 extends ReplicatorMapDeltaSpec @@ -182,7 +182,8 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with override def initialParticipants = roles.size - implicit val cluster = Cluster(system) + val cluster = Cluster(system) + implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress val fullStateReplicator = system.actorOf(Replicator.props( ReplicatorSettings(system).withGossipInterval(1.second).withDeltaCrdtEnabled(false)), "fullStateReplicator") val deltaReplicator = { @@ -241,20 +242,20 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with runOn(first) { // by setting something for each key we don't have to worry about NotFound List(KeyA, KeyB, KeyC).foreach { key ⇒ - fullStateReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment key._2) - deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment key._2) + fullStateReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_.incrementBy(key._2, 1)) + deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_.incrementBy(key._2, 1)) } List(KeyD, KeyE, KeyF).foreach { key ⇒ - fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2 → Set("a"))) - deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2 → Set("a"))) + fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ :+ (key._2 → Set("a"))) + deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ :+ (key._2 → Set("a"))) } List(KeyG, KeyH, KeyI).foreach { key ⇒ - fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ + (key._2 → Set("a"))) - deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ + (key._2 → Set("a"))) + fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ :+ (key._2 → Set("a"))) + deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ :+ (key._2 → Set("a"))) } List(KeyJ, KeyK, KeyL).foreach { key ⇒ - fullStateReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(_ + (key._2 → (ORSet.empty + "a"))) - deltaReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(_ + (key._2 → (ORSet.empty + "a"))) + fullStateReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(_ :+ (key._2 → (ORSet.empty :+ "a"))) + deltaReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(_ :+ (key._2 → (ORSet.empty :+ "a"))) } } enterBarrier("updated-1") @@ -271,7 +272,7 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with val p = TestProbe() List(KeyD, KeyE, KeyF).foreach { key ⇒ fullStateReplicator.tell(Get(key._1, ReadLocal), p.ref) - val res = p.expectMsgType[GetSuccess[ORMultiMap[String, String]]].dataValue.get(key._2) should ===(Some(Set("a"))) + p.expectMsgType[GetSuccess[ORMultiMap[String, String]]].dataValue.get(key._2) should ===(Some(Set("a"))) } } awaitAssert { @@ -300,7 +301,7 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with system.eventStream.subscribe(errorLogProbe.ref, classOf[Error]) runOn(first) { for (_ ← 1 to N; key ← List(KeyA, KeyB)) { - ordinaryReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment key._2) + ordinaryReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_.incrementBy(key._2, 1)) } } enterBarrier("updated-2") @@ -333,44 +334,44 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with log.debug("operation: {}", op) op match { case Delay(d) ⇒ Thread.sleep(d) - case Incr(key, n, consistency) ⇒ - fullStateReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment (key._2, n)) - deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment (key._2, n)) - case Decr(key, n, consistency) ⇒ - fullStateReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ decrement (key._2, n)) - deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ decrement (key._2, n)) - case AddVD(key, elem, consistency) ⇒ + case Incr(key, n, _) ⇒ + fullStateReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ incrementBy (key._2, n)) + deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ incrementBy (key._2, n)) + case Decr(key, n, _) ⇒ + fullStateReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ decrementBy (key._2, n)) + deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ decrementBy (key._2, n)) + case AddVD(key, elem, _) ⇒ // to have an deterministic result when mixing add/remove we can only perform // the ORSet operations from one node runOn((if (key == KeyF) List(first) else List(first, second, third)): _*) { - fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ addBinding (key._2, elem)) - deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ addBinding (key._2, elem)) + fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ addBindingBy (key._2, elem)) + deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ addBindingBy (key._2, elem)) } - case RemoveVD(key, elem, consistency) ⇒ + case RemoveVD(key, elem, _) ⇒ runOn(first) { - fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ removeBinding (key._2, elem)) - deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ removeBinding (key._2, elem)) + fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ removeBindingBy (key._2, elem)) + deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ removeBindingBy (key._2, elem)) } - case AddNoVD(key, elem, consistency) ⇒ + case AddNoVD(key, elem, _) ⇒ // to have an deterministic result when mixing add/remove we can only perform // the ORSet operations from one node runOn((if (key == KeyI) List(first) else List(first, second, third)): _*) { - fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ addBinding (key._2, elem)) - deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ addBinding (key._2, elem)) + fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ addBindingBy (key._2, elem)) + deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ addBindingBy (key._2, elem)) } - case RemoveNoVD(key, elem, consistency) ⇒ + case RemoveNoVD(key, elem, _) ⇒ runOn(first) { - fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ removeBinding (key._2, elem)) - deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ removeBinding (key._2, elem)) + fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ removeBindingBy (key._2, elem)) + deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ removeBindingBy (key._2, elem)) } - case AddOM(key, elem, consistency) ⇒ + case AddOM(key, elem, _) ⇒ // to have an deterministic result when mixing add/remove we can only perform // the ORSet operations from one node runOn((if (key == KeyL) List(first) else List(first, second, third)): _*) { fullStateReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(om ⇒ addElementToORMap(om, key._2, elem)) deltaReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(om ⇒ addElementToORMap(om, key._2, elem)) } - case RemoveOM(key, elem, consistency) ⇒ + case RemoveOM(key, elem, _) ⇒ runOn(first) { fullStateReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(om ⇒ removeElementFromORMap(om, key._2, elem)) deltaReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(om ⇒ removeElementFromORMap(om, key._2, elem)) diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala index 4111e1fe48..bd9e1b5a34 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala @@ -42,7 +42,8 @@ class ReplicatorORSetDeltaSpec extends MultiNodeSpec(ReplicatorORSetDeltaSpec) w override def initialParticipants = roles.size - implicit val cluster = Cluster(system) + val cluster = Cluster(system) + implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress val replicator = system.actorOf(Replicator.props( ReplicatorSettings(system).withGossipInterval(1.second)), "replicator") val timeout = 3.seconds.dilated @@ -88,7 +89,7 @@ class ReplicatorORSetDeltaSpec extends MultiNodeSpec(ReplicatorORSetDeltaSpec) w } runOn(first) { - replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ + "a") + replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ :+ "a") expectMsg(UpdateSuccess(KeyA, None)) } @@ -108,11 +109,11 @@ class ReplicatorORSetDeltaSpec extends MultiNodeSpec(ReplicatorORSetDeltaSpec) w enterBarrier("split") runOn(first) { - replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ + "b") + replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ :+ "b") expectMsg(UpdateSuccess(KeyA, None)) } runOn(second) { - replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ + "d") + replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ :+ "d") expectMsg(UpdateSuccess(KeyA, None)) } runOn(first, second) { @@ -129,7 +130,7 @@ class ReplicatorORSetDeltaSpec extends MultiNodeSpec(ReplicatorORSetDeltaSpec) w runOn(first) { // delta for "c" will be sent to third, but it has not received the previous delta for "b" - replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ + "c") + replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ :+ "c") expectMsg(UpdateSuccess(KeyA, None)) // let the delta be propagated (will not fail if it takes longer) Thread.sleep(1000) @@ -154,7 +155,7 @@ class ReplicatorORSetDeltaSpec extends MultiNodeSpec(ReplicatorORSetDeltaSpec) w // and now the delta seqNr should be in sync again runOn(first) { - replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ + "e") + replicator ! Update(KeyA, ORSet.empty[String], WriteLocal)(_ :+ "e") expectMsg(UpdateSuccess(KeyA, None)) } assertValue(KeyA, Set("a", "b", "c", "d", "e")) diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala index 1c641455e1..d39f3e3fc8 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala @@ -38,7 +38,8 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST override def initialParticipants = roles.size - implicit val cluster = Cluster(system) + val cluster = Cluster(system) + implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress val maxPruningDissemination = 3.seconds val replicator = system.actorOf(Replicator.props( ReplicatorSettings(system).withGossipInterval(1.second) @@ -83,19 +84,19 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST member.uniqueAddress } - replicator ! Update(KeyA, GCounter(), WriteAll(timeout))(_ + 3) + replicator ! Update(KeyA, GCounter(), WriteAll(timeout))(_ :+ 3) expectMsg(UpdateSuccess(KeyA, None)) - replicator ! Update(KeyB, ORSet(), WriteAll(timeout))(_ + "a" + "b" + "c") + replicator ! Update(KeyB, ORSet(), WriteAll(timeout))(_ :+ "a" :+ "b" :+ "c") expectMsg(UpdateSuccess(KeyB, None)) - replicator ! Update(KeyC, PNCounterMap.empty[String], WriteAll(timeout)) { _ increment "x" increment "y" } + replicator ! Update(KeyC, PNCounterMap.empty[String], WriteAll(timeout)) { _.incrementBy("x", 1).incrementBy("y", 1) } expectMsg(UpdateSuccess(KeyC, None)) - replicator ! Update(KeyD, ORMultiMap.empty[String, String], WriteAll(timeout)) { _ + ("a" → Set("A")) } + replicator ! Update(KeyD, ORMultiMap.empty[String, String], WriteAll(timeout)) { _ :+ ("a" → Set("A")) } expectMsg(UpdateSuccess(KeyD, None)) - replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteAll(timeout)) { _ + ("a" → GSet.empty[String].add("A")) } + replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteAll(timeout)) { _ :+ ("a" → GSet.empty[String].add("A")) } expectMsg(UpdateSuccess(KeyE, None)) enterBarrier("updates-done") @@ -125,7 +126,7 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST enterBarrier("get-old") runOn(third) { - replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteLocal) { _ - "a" } + replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteLocal) { _ remove "a" } expectMsg(UpdateSuccess(KeyE, None)) } @@ -206,7 +207,7 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST def updateAfterPruning(expectedValue: Int): Unit = { replicator ! Update(KeyA, GCounter(), WriteAll(timeout), None) { existing ⇒ // inject data from removed node to simulate bad data - existing.merge(oldCounter) + 1 + existing.merge(oldCounter) :+ 1 } expectMsgPF() { case UpdateSuccess(KeyA, _) ⇒ diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala index ca1cdf3f44..4ec911bdfa 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala @@ -40,7 +40,8 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec override def initialParticipants = roles.size - implicit val cluster = Cluster(system) + val cluster = Cluster(system) + implicit val selfUniqueAddress = DistributedData(system).selfUniqueAddress val replicator = system.actorOf(Replicator.props( ReplicatorSettings(system).withGossipInterval(1.second).withMaxDeltaElements(10)), "replicator") val timeout = 3.seconds.dilated @@ -100,8 +101,8 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec replicator ! Get(KeyA, ReadLocal) expectMsg(NotFound(KeyA, None)) - val c3 = GCounter() + 3 - replicator ! Update(KeyA, GCounter(), WriteLocal)(_ + 3) + val c3 = GCounter() :+ 3 + replicator ! Update(KeyA, GCounter(), WriteLocal)(_ :+ 3) expectMsg(UpdateSuccess(KeyA, None)) replicator ! Get(KeyA, ReadLocal) expectMsg(GetSuccess(KeyA, None)(c3)).dataValue should be(c3) @@ -111,31 +112,31 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec replicator ! Subscribe(KeyA, changedProbe2.ref) changedProbe2.expectMsg(Changed(KeyA)(c3)).dataValue should be(c3) - val c4 = c3 + 1 + val c4 = c3 :+ 1 // too strong consistency level - replicator ! Update(KeyA, GCounter(), writeTwo)(_ + 1) + replicator ! Update(KeyA, GCounter(), writeTwo)(_ :+ 1) expectMsg(timeout + 1.second, UpdateTimeout(KeyA, None)) replicator ! Get(KeyA, ReadLocal) expectMsg(GetSuccess(KeyA, None)(c4)).dataValue should be(c4) changedProbe.expectMsg(Changed(KeyA)(c4)).dataValue should be(c4) - val c5 = c4 + 1 + val c5 = c4 :+ 1 // too strong consistency level - replicator ! Update(KeyA, GCounter(), writeMajority)(_ + 1) + replicator ! Update(KeyA, GCounter(), writeMajority)(_ :+ 1) expectMsg(UpdateSuccess(KeyA, None)) replicator ! Get(KeyA, readMajority) expectMsg(GetSuccess(KeyA, None)(c5)).dataValue should be(c5) changedProbe.expectMsg(Changed(KeyA)(c5)).dataValue should be(c5) - val c6 = c5 + 1 - replicator ! Update(KeyA, GCounter(), writeAll)(_ + 1) + val c6 = c5 :+ 1 + replicator ! Update(KeyA, GCounter(), writeAll)(_ :+ 1) expectMsg(UpdateSuccess(KeyA, None)) replicator ! Get(KeyA, readAll) expectMsg(GetSuccess(KeyA, None)(c6)).dataValue should be(c6) changedProbe.expectMsg(Changed(KeyA)(c6)).dataValue should be(c6) - val c9 = GCounter() + 9 - replicator ! Update(KeyX, GCounter(), WriteLocal)(_ + 9) + val c9 = GCounter() :+ 9 + replicator ! Update(KeyX, GCounter(), WriteLocal)(_ :+ 9) expectMsg(UpdateSuccess(KeyX, None)) changedProbe.expectMsg(Changed(KeyX)(c9)).dataValue should be(c9) replicator ! Delete(KeyX, WriteLocal, Some(777)) @@ -145,7 +146,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec expectMsg(DataDeleted(KeyX, Some(789))) replicator ! Get(KeyX, readAll, Some(456)) expectMsg(DataDeleted(KeyX, Some(456))) - replicator ! Update(KeyX, GCounter(), WriteLocal, Some(123))(_ + 1) + replicator ! Update(KeyX, GCounter(), WriteLocal, Some(123))(_ :+ 1) expectMsg(DataDeleted(KeyX, Some(123))) replicator ! Delete(KeyX, WriteLocal, Some(555)) expectMsg(DataDeleted(KeyX, Some(555))) @@ -214,11 +215,11 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec runOn(first, second) { // start with 20 on both nodes - replicator ! Update(KeyB, GCounter(), WriteLocal)(_ + 20) + replicator ! Update(KeyB, GCounter(), WriteLocal)(_ :+ 20) expectMsg(UpdateSuccess(KeyB, None)) // add 1 on both nodes using WriteTwo - replicator ! Update(KeyB, GCounter(), writeTwo)(_ + 1) + replicator ! Update(KeyB, GCounter(), writeTwo)(_ :+ 1) expectMsg(UpdateSuccess(KeyB, None)) // the total, after replication should be 42 @@ -232,7 +233,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec runOn(first, second) { // add 1 on both nodes using WriteAll - replicator ! Update(KeyB, GCounter(), writeAll)(_ + 1) + replicator ! Update(KeyB, GCounter(), writeAll)(_ :+ 1) expectMsg(UpdateSuccess(KeyB, None)) // the total, after replication should be 44 @@ -246,7 +247,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec runOn(first, second) { // add 1 on both nodes using WriteMajority - replicator ! Update(KeyB, GCounter(), writeMajority)(_ + 1) + replicator ! Update(KeyB, GCounter(), writeMajority)(_ :+ 1) expectMsg(UpdateSuccess(KeyB, None)) // the total, after replication should be 46 @@ -267,14 +268,14 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec } runOn(first) { - replicator ! Update(KeyC, GCounter(), writeTwo)(_ + 30) + replicator ! Update(KeyC, GCounter(), writeTwo)(_ :+ 30) expectMsg(UpdateSuccess(KeyC, None)) changedProbe.expectMsgPF() { case c @ Changed(KeyC) ⇒ c.get(KeyC).value } should be(30) - replicator ! Update(KeyY, GCounter(), writeTwo)(_ + 30) + replicator ! Update(KeyY, GCounter(), writeTwo)(_ :+ 30) expectMsg(UpdateSuccess(KeyY, None)) - replicator ! Update(KeyZ, GCounter(), writeMajority)(_ + 30) + replicator ! Update(KeyZ, GCounter(), writeMajority)(_ :+ 30) expectMsg(UpdateSuccess(KeyZ, None)) } enterBarrier("update-c30") @@ -286,7 +287,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec changedProbe.expectMsgPF() { case c @ Changed(KeyC) ⇒ c.get(KeyC).value } should be(30) // replicate with gossip after WriteLocal - replicator ! Update(KeyC, GCounter(), WriteLocal)(_ + 1) + replicator ! Update(KeyC, GCounter(), WriteLocal)(_ :+ 1) expectMsg(UpdateSuccess(KeyC, None)) changedProbe.expectMsgPF() { case c @ Changed(KeyC) ⇒ c.get(KeyC).value } should be(31) @@ -320,7 +321,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec val c31 = expectMsgPF() { case g @ GetSuccess(KeyC, _) ⇒ g.get(KeyC) } c31.value should be(31) - replicator ! Update(KeyC, GCounter(), WriteLocal)(_ + 1) + replicator ! Update(KeyC, GCounter(), WriteLocal)(_ :+ 1) expectMsg(UpdateSuccess(KeyC, None)) within(5.seconds) { @@ -337,7 +338,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec "converge after partition" in { runOn(first) { - replicator ! Update(KeyD, GCounter(), writeTwo)(_ + 40) + replicator ! Update(KeyD, GCounter(), writeTwo)(_ :+ 40) expectMsg(UpdateSuccess(KeyD, None)) testConductor.blackhole(first, second, Direction.Both).await @@ -348,15 +349,15 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec replicator ! Get(KeyD, ReadLocal) val c40 = expectMsgPF() { case g @ GetSuccess(KeyD, _) ⇒ g.get(KeyD) } c40.value should be(40) - replicator ! Update(KeyD, GCounter() + 1, writeTwo)(_ + 1) + replicator ! Update(KeyD, GCounter() :+ 1, writeTwo)(_ :+ 1) expectMsg(timeout + 1.second, UpdateTimeout(KeyD, None)) - replicator ! Update(KeyD, GCounter(), writeTwo)(_ + 1) + replicator ! Update(KeyD, GCounter(), writeTwo)(_ :+ 1) expectMsg(timeout + 1.second, UpdateTimeout(KeyD, None)) } runOn(first) { for (n ← 1 to 30) { val KeyDn = GCounterKey("D" + n) - replicator ! Update(KeyDn, GCounter(), WriteLocal)(_ + n) + replicator ! Update(KeyDn, GCounter(), WriteLocal)(_ :+ n) expectMsg(UpdateSuccess(KeyDn, None)) } } @@ -400,7 +401,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec enterBarrier("3-nodes") runOn(first, second, third) { - replicator ! Update(KeyE, GCounter(), writeMajority)(_ + 50) + replicator ! Update(KeyE, GCounter(), writeMajority)(_ :+ 50) expectMsg(UpdateSuccess(KeyE, None)) } enterBarrier("write-initial-majority") @@ -419,7 +420,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec enterBarrier("blackhole-third") runOn(second) { - replicator ! Update(KeyE, GCounter(), WriteLocal)(_ + 1) + replicator ! Update(KeyE, GCounter(), WriteLocal)(_ :+ 1) expectMsg(UpdateSuccess(KeyE, None)) } enterBarrier("local-update-from-second") @@ -432,7 +433,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec probe2.expectMsgType[GetSuccess[_]] replicator.tell(Update(KeyE, GCounter(), writeMajority, None) { data ⇒ probe1.ref ! data.value - data + 1 + data :+ 1 }, probe2.ref) // verify read your own writes, without waiting for the UpdateSuccess reply // note that the order of the replies are not defined, and therefore we use separate probes @@ -449,13 +450,13 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec val probe1 = TestProbe() replicator.tell(Get(KeyE, readMajority), probe1.ref) probe1.expectMsgType[GetSuccess[_]] - replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(153))(_ + 1), probe1.ref) + replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(153))(_ :+ 1), probe1.ref) // verify read your own writes, without waiting for the UpdateSuccess reply // note that the order of the replies are not defined, and therefore we use separate probes val probe2 = TestProbe() - replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(154))(_ + 1), probe2.ref) + replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(154))(_ :+ 1), probe2.ref) val probe3 = TestProbe() - replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(155))(_ + 1), probe3.ref) + replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(155))(_ :+ 1), probe3.ref) val probe5 = TestProbe() replicator.tell(Get(KeyE, readMajority), probe5.ref) probe1.expectMsg(UpdateSuccess(KeyE, Some(153))) @@ -492,9 +493,9 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec "converge after many concurrent updates" in within(10.seconds) { runOn(first, second, third) { var c = GCounter() - for (i ← 0 until 100) { - c += 1 - replicator ! Update(KeyF, GCounter(), writeTwo)(_ + 1) + for (_ ← 0 until 100) { + c :+= 1 + replicator ! Update(KeyF, GCounter(), writeTwo)(_ :+ 1) } val results = receiveN(100) results.map(_.getClass).toSet should be(Set(classOf[UpdateSuccess[_]])) @@ -510,7 +511,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec "read-repair happens before GetSuccess" in { runOn(first) { - replicator ! Update(KeyG, ORSet(), writeTwo)(_ + "a" + "b") + replicator ! Update(KeyG, ORSet(), writeTwo)(_ :+ "a" :+ "b") expectMsgType[UpdateSuccess[_]] } enterBarrier("a-b-added-to-G") @@ -528,20 +529,20 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec runOn(second) { replicator ! Subscribe(KeyH, changedProbe.ref) - replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ + ("a" → Flag.Disabled)) + replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ :+ ("a" → Flag.Disabled)) changedProbe.expectMsgPF() { case c @ Changed(KeyH) ⇒ c.get(KeyH).entries } should be(Map("a" → Flag.Disabled)) } enterBarrier("update-h1") runOn(first) { - replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ + ("a" → Flag.Enabled)) + replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ :+ ("a" → Flag.Enabled)) } runOn(second) { changedProbe.expectMsgPF() { case c @ Changed(KeyH) ⇒ c.get(KeyH).entries } should be(Map("a" → Flag.Enabled)) - replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ + ("b" → Flag.Enabled)) + replicator ! Update(KeyH, ORMap.empty[String, Flag], writeTwo)(_ :+ ("b" → Flag.Enabled)) changedProbe.expectMsgPF() { case c @ Changed(KeyH) ⇒ c.get(KeyH).entries } should be( Map("a" → Flag.Enabled, "b" → Flag.Enabled)) } @@ -568,7 +569,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec replicator ! Update(KeyI, GSet.empty[String], writeTwo)(_ + "a") } - changedProbe.expectNoMsg(1.second) + changedProbe.expectNoMessage(1.second) enterBarrierAfterTestStep() } diff --git a/akka-distributed-data/src/test/java/akka/cluster/ddata/ORMapTest.java b/akka-distributed-data/src/test/java/akka/cluster/ddata/ORMapTest.java index 0b392127a5..b131eb0008 100644 --- a/akka-distributed-data/src/test/java/akka/cluster/ddata/ORMapTest.java +++ b/akka-distributed-data/src/test/java/akka/cluster/ddata/ORMapTest.java @@ -4,13 +4,11 @@ package akka.cluster.ddata; -import akka.cluster.Cluster; - public class ORMapTest { public void compileOnlyORMapTest() { // primarily to check API accessibility with overloads/types - Cluster node1 = null; + SelfUniqueAddress node1 = null; ORMap> orMap = ORMap.create(); // updated needs a cast diff --git a/akka-distributed-data/src/test/java/akka/cluster/ddata/ORMultiMapTest.java b/akka-distributed-data/src/test/java/akka/cluster/ddata/ORMultiMapTest.java index caa4e88688..3043ed3b26 100644 --- a/akka-distributed-data/src/test/java/akka/cluster/ddata/ORMultiMapTest.java +++ b/akka-distributed-data/src/test/java/akka/cluster/ddata/ORMultiMapTest.java @@ -4,13 +4,11 @@ package akka.cluster.ddata; -import akka.cluster.Cluster; - public class ORMultiMapTest { public void compileOnlyORMultiMapTest() { // primarily to check API accessibility with overloads/types - Cluster node = null; + SelfUniqueAddress node = null; ORMultiMap orMultiMap = ORMultiMap.create(); orMultiMap.addBinding(node, "a", "1"); orMultiMap.removeBinding(node, "a", "1"); diff --git a/akka-distributed-data/src/test/java/akka/cluster/ddata/PNCounterTest.java b/akka-distributed-data/src/test/java/akka/cluster/ddata/PNCounterTest.java index 2b4a2cf516..fffd4f8547 100644 --- a/akka-distributed-data/src/test/java/akka/cluster/ddata/PNCounterTest.java +++ b/akka-distributed-data/src/test/java/akka/cluster/ddata/PNCounterTest.java @@ -4,8 +4,6 @@ package akka.cluster.ddata; -import akka.cluster.Cluster; - import java.math.BigInteger; public class PNCounterTest { @@ -13,8 +11,8 @@ public class PNCounterTest { public void compileOnlyPNCounterApiTest() { // primarily to check API accessibility with overloads/types - Cluster node1 = null; - Cluster node2 = null; + SelfUniqueAddress node1 = null; + SelfUniqueAddress node2 = null; PNCounter c1 = PNCounter.create(); diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWRegisterSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWRegisterSpec.scala index 4caf7d06af..1ddec798f3 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWRegisterSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWRegisterSpec.scala @@ -56,17 +56,19 @@ class LWWRegisterSpec extends WordSpec with Matchers { } "use monotonically increasing defaultClock" in { - (1 to 100).foldLeft(LWWRegister(node1, 0, defaultClock)) { + implicit val node = SelfUniqueAddress(node1) + + (1 to 100).foldLeft(LWWRegister.create(0)) { case (r, n) ⇒ r.value should be(n - 1) - val r2 = r.withValue(node1, n, defaultClock[Int]) + val r2 = r.withValueOf(n) r2.timestamp should be > r.timestamp r2 } } "have unapply extractor" in { - val r1 = LWWRegister(node1, "a", defaultClock) + val r1 = LWWRegister(node1, "a", defaultClock[String]) val LWWRegister(value1) = r1 val value2: String = value1 Changed(LWWRegisterKey[String]("key"))(r1) match { diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LocalConcurrencySpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LocalConcurrencySpec.scala index 9af26fcf6d..3915b84a39 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LocalConcurrencySpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LocalConcurrencySpec.scala @@ -8,7 +8,6 @@ import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props import akka.actor.Stash -import akka.cluster.Cluster import akka.testkit.ImplicitSender import akka.testkit.TestKit import com.typesafe.config.ConfigFactory @@ -25,12 +24,14 @@ object LocalConcurrencySpec { } class Updater extends Actor with Stash { - implicit val cluster = Cluster(context.system) + + implicit val selfUniqueAddress = DistributedData(context.system).selfUniqueAddress + val replicator = DistributedData(context.system).replicator def receive = { case s: String ⇒ - val update = Replicator.Update(Updater.key, ORSet.empty[String], Replicator.WriteLocal)(_ + s) + val update = Replicator.Update(Updater.key, ORSet.empty[String], Replicator.WriteLocal)(_ :+ s) replicator ! update } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LotsOfDataBot.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LotsOfDataBot.scala index d9b29f4c5b..f93ec69ffb 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LotsOfDataBot.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LotsOfDataBot.scala @@ -4,20 +4,14 @@ package akka.cluster.ddata -import scala.concurrent.duration._ import java.util.concurrent.ThreadLocalRandom + +import scala.concurrent.duration._ + import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorSystem import akka.actor.Props -import akka.cluster.Cluster -import akka.cluster.ddata.Replicator.Changed -import akka.cluster.ddata.Replicator.GetKeyIds -import akka.cluster.ddata.Replicator.GetKeyIdsResult -import akka.cluster.ddata.Replicator.Subscribe -import akka.cluster.ddata.Replicator.Update -import akka.cluster.ddata.Replicator.UpdateResponse -import akka.cluster.ddata.Replicator.WriteLocal import com.typesafe.config.ConfigFactory /** @@ -75,8 +69,8 @@ class LotsOfDataBot extends Actor with ActorLogging { import LotsOfDataBot._ import Replicator._ + implicit val selfUniqueAddress = DistributedData(context.system).selfUniqueAddress val replicator = DistributedData(context.system).replicator - implicit val cluster = Cluster(context.system) import context.dispatcher val isPassive = context.system.settings.config.getBoolean("passive") @@ -110,10 +104,10 @@ class LotsOfDataBot extends Actor with ActorLogging { val s = ThreadLocalRandom.current().nextInt(97, 123).toChar.toString if (count <= maxEntries || ThreadLocalRandom.current().nextBoolean()) { // add - replicator ! Update(key, ORSet(), WriteLocal)(_ + s) + replicator ! Update(key, ORSet(), WriteLocal)(_ :+ s) } else { // remove - replicator ! Update(key, ORSet(), WriteLocal)(_ - s) + replicator ! Update(key, ORSet(), WriteLocal)(_ remove s) } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterMapSpec.scala index 61bb080465..93c333ec1d 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterMapSpec.scala @@ -17,6 +17,11 @@ class PNCounterMapSpec extends WordSpec with Matchers { "A PNCounterMap" must { + "be able to increment and decrement entries with implicit SelfUniqueAddress" in { + implicit val node = SelfUniqueAddress(node1) + PNCounterMap().incrementBy("a", 2).incrementBy("b", 1).incrementBy("b", 2).decrementBy("a", 1).entries should be(Map("a" → 1, "b" → 3)) + } + "be able to increment and decrement entries" in { val m = PNCounterMap().increment(node1, "a", 2).increment(node1, "b", 3).decrement(node2, "a", 1) m.entries should be(Map("a" → 1, "b" → 3)) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ReplicatorSettingsSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ReplicatorSettingsSpec.scala new file mode 100644 index 0000000000..31e768bdc1 --- /dev/null +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ReplicatorSettingsSpec.scala @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.ddata + +import akka.testkit.AkkaSpec +import com.typesafe.config.ConfigFactory +import org.scalatest.{ BeforeAndAfterAll, WordSpecLike } + +object ReplicatorSettingsSpec { + + val config = ConfigFactory.parseString(""" + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.remote.artery.canonical.hostname = 127.0.0.1""") +} + +class ReplicatorSettingsSpec extends AkkaSpec(ReplicatorSettingsSpec.config) + with WordSpecLike with BeforeAndAfterAll { + + "DistributedData" must { + "have the default replicator name" in { + ReplicatorSettings.name(system, None) should ===("ddataReplicator") + } + "have the prefixed replicator name" in { + ReplicatorSettings.name(system, Some("other")) should ===("otherDdataReplicator") + } + } +} diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala index 2ef582728d..2089527cd8 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala @@ -186,7 +186,7 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( } "serialize LWWRegister" in { - checkSerialization(LWWRegister(address1, "value1", LWWRegister.defaultClock)) + checkSerialization(LWWRegister(address1, "value1", LWWRegister.defaultClock[String])) checkSerialization(LWWRegister(address1, "value2", LWWRegister.defaultClock[String]) .withValue(address2, "value3", LWWRegister.defaultClock[String])) } diff --git a/akka-docs/src/main/paradox/typed/distributed-data.md b/akka-docs/src/main/paradox/typed/distributed-data.md index cadf5e5ef4..50029d1406 100644 --- a/akka-docs/src/main/paradox/typed/distributed-data.md +++ b/akka-docs/src/main/paradox/typed/distributed-data.md @@ -39,8 +39,8 @@ actor provides the API for interacting with the data and is accessed through the The messages for the replicator, such as `Replicator.Update` are defined in @scala[`akka.cluster.ddata.typed.scaladsl.Replicator`] @java[`akka.cluster.ddata.typed.scaladsl.Replicator`] but the actual CRDTs are the -same as in untyped, for example `akka.cluster.ddata.GCounter`. This will require an @scala[implicit] untyped `Cluster` -for now, we hope to improve this in the future ([issue #25746](https://github.com/akka/akka/issues/25746)). +same as in untyped, for example `akka.cluster.ddata.GCounter`. This will require a @scala[implicit] `akka.cluster.ddata.SelfUniqueAddress.SelfUniqueAddress`, +available from @scala[`implicit val node = DistributedData(system).selfUniqueAddress`]@java[SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();]. The replicator can contain multiple entries each containing a replicated data type, we therefore need to create a key identifying the entry and helping us know what type it has, and then use that key for every interaction with diff --git a/akka-docs/src/test/java/jdocs/ddata/DistributedDataDocTest.java b/akka-docs/src/test/java/jdocs/ddata/DistributedDataDocTest.java index 286b38e843..0223f80c89 100644 --- a/akka-docs/src/test/java/jdocs/ddata/DistributedDataDocTest.java +++ b/akka-docs/src/test/java/jdocs/ddata/DistributedDataDocTest.java @@ -50,7 +50,7 @@ public class DistributedDataDocTest extends AbstractJavaTest { static //#update class DemonstrateUpdate extends AbstractActor { - final Cluster node = Cluster.get(getContext().getSystem()); + final SelfUniqueAddress node = DistributedData.get(getContext().getSystem()).selfUniqueAddress(); final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator(); @@ -305,7 +305,7 @@ public class DistributedDataDocTest extends AbstractJavaTest { public void demonstratePNCounter() { //#pncounter - final Cluster node = Cluster.get(system); + final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress(); final PNCounter c0 = PNCounter.create(); final PNCounter c1 = c0.increment(node, 1); final PNCounter c2 = c1.increment(node, 7); @@ -316,7 +316,7 @@ public class DistributedDataDocTest extends AbstractJavaTest { public void demonstratePNCounterMap() { //#pncountermap - final Cluster node = Cluster.get(system); + final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress(); final PNCounterMap m0 = PNCounterMap.create(); final PNCounterMap m1 = m0.increment(node, "a", 7); final PNCounterMap m2 = m1.decrement(node, "a", 2); @@ -349,7 +349,7 @@ public class DistributedDataDocTest extends AbstractJavaTest { public void demonstrateORMultiMap() { //#ormultimap - final Cluster node = Cluster.get(system); + final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress(); final ORMultiMap m0 = ORMultiMap.create(); final ORMultiMap m1 = m0.put(node, "a", new HashSet<>(Arrays.asList(1, 2, 3))); @@ -371,7 +371,7 @@ public class DistributedDataDocTest extends AbstractJavaTest { @Test public void demonstrateLWWRegister() { //#lwwregister - final Cluster node = Cluster.get(system); + final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress(); final LWWRegister r1 = LWWRegister.create(node, "Hello"); final LWWRegister r2 = r1.withValue(node, "Hi"); System.out.println(r1.value() + " by " + r1.updatedBy() + " at " + r1.timestamp()); @@ -398,7 +398,7 @@ public class DistributedDataDocTest extends AbstractJavaTest { public void demonstrateLWWRegisterWithCustomClock() { //#lwwregister-custom-clock - final Cluster node = Cluster.get(system); + final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress(); final LWWRegister.Clock recordClock = new LWWRegister.Clock() { @Override public long apply(long currentTimestamp, Record value) { diff --git a/akka-docs/src/test/java/jdocs/ddata/ShoppingCart.java b/akka-docs/src/test/java/jdocs/ddata/ShoppingCart.java index 67f9d47404..78678df091 100644 --- a/akka-docs/src/test/java/jdocs/ddata/ShoppingCart.java +++ b/akka-docs/src/test/java/jdocs/ddata/ShoppingCart.java @@ -13,12 +13,7 @@ import java.time.Duration; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.Props; -import akka.cluster.Cluster; -import akka.cluster.ddata.DistributedData; -import akka.cluster.ddata.Key; -import akka.cluster.ddata.LWWMap; -import akka.cluster.ddata.LWWMapKey; -import akka.cluster.ddata.Replicator; +import akka.cluster.ddata.*; import akka.cluster.ddata.Replicator.GetFailure; import akka.cluster.ddata.Replicator.GetResponse; import akka.cluster.ddata.Replicator.GetSuccess; @@ -126,7 +121,7 @@ public class ShoppingCart extends AbstractActor { } private final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator(); - private final Cluster node = Cluster.get(getContext().getSystem()); + private final SelfUniqueAddress node = DistributedData.get(getContext().getSystem()).selfUniqueAddress(); @SuppressWarnings("unused") private final String userId; diff --git a/akka-docs/src/test/scala/docs/ddata/DistributedDataDocSpec.scala b/akka-docs/src/test/scala/docs/ddata/DistributedDataDocSpec.scala index 8a62692e12..09bbca8201 100644 --- a/akka-docs/src/test/scala/docs/ddata/DistributedDataDocSpec.scala +++ b/akka-docs/src/test/scala/docs/ddata/DistributedDataDocSpec.scala @@ -5,8 +5,8 @@ package docs.ddata import scala.concurrent.duration._ + import akka.actor.Actor -import akka.cluster.Cluster import akka.cluster.ddata._ import akka.testkit.AkkaSpec import akka.testkit.TestProbe @@ -63,7 +63,7 @@ object DistributedDataDocSpec { import DataBot._ val replicator = DistributedData(context.system).replicator - implicit val node = Cluster(context.system) + implicit val node = DistributedData(context.system).selfUniqueAddress import context.dispatcher val tickTask = context.system.scheduler.schedule(5.seconds, 5.seconds, self, Tick) @@ -78,11 +78,11 @@ object DistributedDataDocSpec { if (ThreadLocalRandom.current().nextBoolean()) { // add log.info("Adding: {}", s) - replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ + s) + replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ :+ s) } else { // remove log.info("Removing: {}", s) - replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ - s) + replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ remove s) } case _: UpdateResponse[_] ⇒ // ignore @@ -107,7 +107,7 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) { implicit val self = probe.ref //#update - implicit val node = Cluster(system) + implicit val node = DistributedData(system).selfUniqueAddress val replicator = DistributedData(system).replicator val Counter1Key = PNCounterKey("counter1") @@ -115,13 +115,13 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) { val Set2Key = ORSetKey[String]("set2") val ActiveFlagKey = FlagKey("active") - replicator ! Update(Counter1Key, PNCounter(), WriteLocal)(_ + 1) + replicator ! Update(Counter1Key, PNCounter(), WriteLocal)(_ :+ 1) val writeTo3 = WriteTo(n = 3, timeout = 1.second) replicator ! Update(Set1Key, GSet.empty[String], writeTo3)(_ + "hello") val writeMajority = WriteMajority(timeout = 5.seconds) - replicator ! Update(Set2Key, ORSet.empty[String], writeMajority)(_ + "hello") + replicator ! Update(Set2Key, ORSet.empty[String], writeMajority)(_ :+ "hello") val writeAll = WriteAll(timeout = 5.seconds) replicator ! Update(ActiveFlagKey, Flag.Disabled, writeAll)(_.switchOn) @@ -152,7 +152,7 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) { def sender() = self //#update-request-context - implicit val node = Cluster(system) + implicit val node = DistributedData(system).selfUniqueAddress val replicator = DistributedData(system).replicator val writeTwo = WriteTo(n = 2, timeout = 3.second) val Counter1Key = PNCounterKey("counter1") @@ -160,7 +160,7 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) { def receive: Receive = { case "increment" ⇒ // incoming command to increase the counter - val upd = Update(Counter1Key, PNCounter(), writeTwo, request = Some(sender()))(_ + 1) + val upd = Update(Counter1Key, PNCounter(), writeTwo, request = Some(sender()))(_ :+ 1) replicator ! upd case UpdateSuccess(Counter1Key, Some(replyTo: ActorRef)) ⇒ @@ -224,7 +224,7 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) { def sender() = self //#get-request-context - implicit val node = Cluster(system) + implicit val node = DistributedData(system).selfUniqueAddress val replicator = DistributedData(system).replicator val readTwo = ReadFrom(n = 2, timeout = 3.second) val Counter1Key = PNCounterKey("counter1") @@ -287,11 +287,12 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) { "demonstrate PNCounter" in { def println(o: Any): Unit = () //#pncounter - implicit val node = Cluster(system) + implicit val node = DistributedData(system).selfUniqueAddress + val c0 = PNCounter.empty - val c1 = c0 + 1 - val c2 = c1 + 7 - val c3: PNCounter = c2 - 2 + val c1 = c0 :+ 1 + val c2 = c1 :+ 7 + val c3: PNCounter = c2 decrement 2 println(c3.value) // 6 //#pncounter } @@ -299,11 +300,11 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) { "demonstrate PNCounterMap" in { def println(o: Any): Unit = () //#pncountermap - implicit val node = Cluster(system) + implicit val node = DistributedData(system).selfUniqueAddress val m0 = PNCounterMap.empty[String] - val m1 = m0.increment("a", 7) - val m2 = m1.decrement("a", 2) - val m3 = m2.increment("b", 1) + val m1 = m0.increment(node, "a", 7) + val m2 = m1.decrement(node, "a", 2) + val m3 = m2.increment(node, "b", 1) println(m3.get("a")) // 5 m3.entries.foreach { case (key, value) ⇒ println(s"$key -> $value") } //#pncountermap @@ -323,11 +324,11 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) { "demonstrate ORSet" in { def println(o: Any): Unit = () //#orset - implicit val node = Cluster(system) + implicit val node = DistributedData(system).selfUniqueAddress val s0 = ORSet.empty[String] - val s1 = s0 + "a" - val s2 = s1 + "b" - val s3 = s2 - "a" + val s1 = s0 :+ "a" + val s2 = s1 :+ "b" + val s3 = s2 remove "a" println(s3.elements) // b //#orset } @@ -335,12 +336,12 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) { "demonstrate ORMultiMap" in { def println(o: Any): Unit = () //#ormultimap - implicit val node = Cluster(system) + implicit val node = DistributedData(system).selfUniqueAddress val m0 = ORMultiMap.empty[String, Int] - val m1 = m0 + ("a" -> Set(1, 2, 3)) - val m2 = m1.addBinding("a", 4) - val m3 = m2.removeBinding("a", 2) - val m4 = m3.addBinding("b", 1) + val m1 = m0 :+ ("a" -> Set(1, 2, 3)) + val m2 = m1.addBinding(node, "a", 4) + val m3 = m2.removeBinding(node, "a", 2) + val m4 = m3.addBinding(node, "b", 1) println(m4.entries) //#ormultimap } @@ -357,9 +358,9 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) { "demonstrate LWWRegister" in { def println(o: Any): Unit = () //#lwwregister - implicit val node = Cluster(system) - val r1 = LWWRegister("Hello") - val r2 = r1.withValue("Hi") + implicit val node = DistributedData(system).selfUniqueAddress + val r1 = LWWRegister.create("Hello") + val r2 = r1.withValueOf("Hi") println(s"${r1.value} by ${r1.updatedBy} at ${r1.timestamp}") //#lwwregister r2.value should be("Hi") @@ -370,17 +371,17 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) { //#lwwregister-custom-clock case class Record(version: Int, name: String, address: String) - implicit val node = Cluster(system) + implicit val node = DistributedData(system).selfUniqueAddress implicit val recordClock = new LWWRegister.Clock[Record] { override def apply(currentTimestamp: Long, value: Record): Long = value.version } val record1 = Record(version = 1, "Alice", "Union Square") - val r1 = LWWRegister(record1) + val r1 = LWWRegister(node, record1, recordClock) val record2 = Record(version = 2, "Alice", "Madison Square") - val r2 = LWWRegister(record2) + val r2 = LWWRegister(node, record2, recordClock) val r3 = r1.merge(r2) println(r3.value) diff --git a/akka-docs/src/test/scala/docs/ddata/ShoppingCart.scala b/akka-docs/src/test/scala/docs/ddata/ShoppingCart.scala index f0204fd58f..b74658ed3a 100644 --- a/akka-docs/src/test/scala/docs/ddata/ShoppingCart.scala +++ b/akka-docs/src/test/scala/docs/ddata/ShoppingCart.scala @@ -5,10 +5,10 @@ package scala.docs.ddata import scala.concurrent.duration._ + import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Props -import akka.cluster.Cluster import akka.cluster.ddata.DistributedData import akka.cluster.ddata.LWWMap import akka.cluster.ddata.LWWMapKey @@ -38,7 +38,7 @@ class ShoppingCart(userId: String) extends Actor { import akka.cluster.ddata.Replicator._ val replicator = DistributedData(context.system).replicator - implicit val cluster = Cluster(context.system) + implicit val node = DistributedData(context.system).selfUniqueAddress val DataKey = LWWMapKey[String, LineItem]("cart-" + userId) @@ -79,8 +79,8 @@ class ShoppingCart(userId: String) extends Actor { def updateCart(data: LWWMap[String, LineItem], item: LineItem): LWWMap[String, LineItem] = data.get(item.productId) match { case Some(LineItem(_, _, existingQuantity)) ⇒ - data + (item.productId -> item.copy(quantity = existingQuantity + item.quantity)) - case None ⇒ data + (item.productId -> item) + data :+ (item.productId -> item.copy(quantity = existingQuantity + item.quantity)) + case None ⇒ data :+ (item.productId -> item) } //#remove-item @@ -92,13 +92,13 @@ class ShoppingCart(userId: String) extends Actor { case GetSuccess(DataKey, Some(RemoveItem(productId))) ⇒ replicator ! Update(DataKey, LWWMap(), writeMajority, None) { - _ - productId + _.remove(node, productId) } case GetFailure(DataKey, Some(RemoveItem(productId))) ⇒ // ReadMajority failed, fall back to best effort local value replicator ! Update(DataKey, LWWMap(), writeMajority, None) { - _ - productId + _.remove(node, productId) } case NotFound(DataKey, Some(RemoveItem(productId))) ⇒