diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala index 1cdb864d60..c12496b2ba 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/LWWMap.scala @@ -44,8 +44,19 @@ final class LWWMap[A] private[akka] ( type T = LWWMap[A] + /** + * Scala API: All entries of the map. + */ def entries: Map[String, A] = underlying.entries.map { case (k, r) ⇒ k -> r.value } + /** + * Java API: All entries of the map. + */ + def getEntries(): java.util.Map[String, A] = { + import scala.collection.JavaConverters._ + entries.asJava + } + def get(key: String): Option[A] = underlying.get(key).map(_.value) def contains(key: String): Boolean = underlying.contains(key) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala index 8680e170ad..4da387d284 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PNCounterMap.scala @@ -37,7 +37,10 @@ final class PNCounterMap private[akka] ( def entries: Map[String, BigInt] = underlying.entries.map { case (k, c) ⇒ k -> c.value } /** Java API */ - def getEntries: Map[String, BigInteger] = underlying.entries.map { case (k, c) ⇒ k -> c.value.bigInteger } + def getEntries: java.util.Map[String, BigInteger] = { + import scala.collection.JavaConverters._ + underlying.entries.map { case (k, c) ⇒ k -> c.value.bigInteger }.asJava + } /** * Scala API: The count for a key diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index bfc212f616..5321328caf 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -217,6 +217,13 @@ object Replicator { * Java API: `Get` value from local `Replicator`, i.e. `ReadLocal` consistency. */ def this(key: Key[A], consistency: ReadConsistency) = this(key, consistency, None) + + /** + * Java API: `Get` value from local `Replicator`, i.e. `ReadLocal` consistency. + */ + def this(key: Key[A], consistency: ReadConsistency, request: Optional[Any]) = + this(key, consistency, Option(request.orElse(null))) + } sealed abstract class GetResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded { def key: Key[A] diff --git a/akka-docs/rst/java/code/docs/ddata/DataBot.java b/akka-docs/rst/java/code/docs/ddata/DataBot.java index df95ceaaf0..8802063db6 100644 --- a/akka-docs/rst/java/code/docs/ddata/DataBot.java +++ b/akka-docs/rst/java/code/docs/ddata/DataBot.java @@ -5,7 +5,6 @@ package docs.ddata; //#data-bot import static java.util.concurrent.TimeUnit.SECONDS; - import scala.concurrent.duration.Duration; import scala.concurrent.forkjoin.ThreadLocalRandom; @@ -42,41 +41,47 @@ public class DataBot extends AbstractActor { private final Key> dataKey = ORSetKey.create("key"); + @SuppressWarnings("unchecked") public DataBot() { - receive(ReceiveBuilder. - match(String.class, a -> a.equals(TICK), a -> { - String s = String.valueOf((char) ThreadLocalRandom.current().nextInt(97, 123)); - if (ThreadLocalRandom.current().nextBoolean()) { - // add - log.info("Adding: {}", s); - Update> update = new Update<>( - dataKey, - ORSet.create(), - Replicator.writeLocal(), - curr -> curr.add(node, s)); - replicator.tell(update, self()); - } else { - // remove - log.info("Removing: {}", s); - Update> update = new Update<>( - dataKey, - ORSet.create(), - Replicator.writeLocal(), - curr -> curr.remove(node, s)); - replicator.tell(update, self()); - } - }). - match(UpdateResponse.class, r -> { - // ignore - }). - match(Changed.class, c -> c.key().equals(dataKey), c -> { - @SuppressWarnings("unchecked") - Changed> c2 = c; - ORSet data = c2.dataValue(); - log.info("Current elements: {}", data.getElements()); - }). - matchAny(o -> log.info("received unknown message")).build() - ); + receive(ReceiveBuilder + .match(String.class, a -> a.equals(TICK), a -> receiveTick()) + .match(Changed.class, c -> c.key().equals(dataKey), c -> receiveChanged((Changed>) c)) + .match(UpdateResponse.class, r -> receiveUpdateResoponse()) + .build()); + } + + + private void receiveTick() { + String s = String.valueOf((char) ThreadLocalRandom.current().nextInt(97, 123)); + if (ThreadLocalRandom.current().nextBoolean()) { + // add + log.info("Adding: {}", s); + Update> update = new Update<>( + dataKey, + ORSet.create(), + Replicator.writeLocal(), + curr -> curr.add(node, s)); + replicator.tell(update, self()); + } else { + // remove + log.info("Removing: {}", s); + Update> update = new Update<>( + dataKey, + ORSet.create(), + Replicator.writeLocal(), + curr -> curr.remove(node, s)); + replicator.tell(update, self()); + } + } + + + private void receiveChanged(Changed> c) { + ORSet data = c.dataValue(); + log.info("Current elements: {}", data.getElements()); + } + + private void receiveUpdateResoponse() { + // ignore } diff --git a/akka-docs/rst/java/distributed-data.rst b/akka-docs/rst/java/distributed-data.rst index 14ebea1956..e9299ecb43 100644 --- a/akka-docs/rst/java/distributed-data.rst +++ b/akka-docs/rst/java/distributed-data.rst @@ -176,13 +176,11 @@ to 4 nodes and reads from 4 nodes. Here is an example of using ``writeMajority`` and ``readMajority``: -**FIXME convert this example to Java** +.. includecode:: ../../../akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java#read-write-majority -.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#read-write-majority +.. includecode:: ../../../akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java#get-cart -.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#get-cart - -.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#add-item +.. includecode:: ../../../akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java#add-item In some rare cases, when performing an ``Update`` it is needed to first try to fetch latest data from other nodes. That can be done by first sending a ``Get`` with ``ReadMajority`` and then continue with @@ -194,9 +192,7 @@ performed (hence the name observed-removed set). The following example illustrates how to do that: -**FIXME convert this example to Java** - -.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#remove-item +.. includecode:: ../../../akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java#remove-item .. warning:: @@ -446,10 +442,8 @@ cluster. Data types that need pruning have to implement the ``RemovedNodePruning Samples ======= -**FIXME convert these sampes to Java** - Several interesting samples are included and described in the `Typesafe Activator `_ -tutorial named `Akka Distributed Data Samples with Scala `_. +tutorial named `Akka Distributed Data Samples with Java `_. * Low Latency Voting Service * Highly Available Shopping Cart diff --git a/akka-samples/akka-sample-distributed-data-java/.gitignore b/akka-samples/akka-sample-distributed-data-java/.gitignore new file mode 100644 index 0000000000..660c959e44 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/.gitignore @@ -0,0 +1,17 @@ +*# +*.iml +*.ipr +*.iws +*.pyc +*.tm.epoch +*.vim +*-shim.sbt +.idea/ +/project/plugins/project +project/boot +target/ +/logs +.cache +.classpath +.project +.settings \ No newline at end of file diff --git a/akka-samples/akka-sample-distributed-data-java/COPYING b/akka-samples/akka-sample-distributed-data-java/COPYING new file mode 100644 index 0000000000..0e259d42c9 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/COPYING @@ -0,0 +1,121 @@ +Creative Commons Legal Code + +CC0 1.0 Universal + + CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE + LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN + ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS + INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES + REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS + PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM + THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED + HEREUNDER. + +Statement of Purpose + +The laws of most jurisdictions throughout the world automatically confer +exclusive Copyright and Related Rights (defined below) upon the creator +and subsequent owner(s) (each and all, an "owner") of an original work of +authorship and/or a database (each, a "Work"). + +Certain owners wish to permanently relinquish those rights to a Work for +the purpose of contributing to a commons of creative, cultural and +scientific works ("Commons") that the public can reliably and without fear +of later claims of infringement build upon, modify, incorporate in other +works, reuse and redistribute as freely as possible in any form whatsoever +and for any purposes, including without limitation commercial purposes. +These owners may contribute to the Commons to promote the ideal of a free +culture and the further production of creative, cultural and scientific +works, or to gain reputation or greater distribution for their Work in +part through the use and efforts of others. + +For these and/or other purposes and motivations, and without any +expectation of additional consideration or compensation, the person +associating CC0 with a Work (the "Affirmer"), to the extent that he or she +is an owner of Copyright and Related Rights in the Work, voluntarily +elects to apply CC0 to the Work and publicly distribute the Work under its +terms, with knowledge of his or her Copyright and Related Rights in the +Work and the meaning and intended legal effect of CC0 on those rights. + +1. Copyright and Related Rights. A Work made available under CC0 may be +protected by copyright and related or neighboring rights ("Copyright and +Related Rights"). Copyright and Related Rights include, but are not +limited to, the following: + + i. the right to reproduce, adapt, distribute, perform, display, + communicate, and translate a Work; + ii. moral rights retained by the original author(s) and/or performer(s); +iii. publicity and privacy rights pertaining to a person's image or + likeness depicted in a Work; + iv. rights protecting against unfair competition in regards to a Work, + subject to the limitations in paragraph 4(a), below; + v. rights protecting the extraction, dissemination, use and reuse of data + in a Work; + vi. database rights (such as those arising under Directive 96/9/EC of the + European Parliament and of the Council of 11 March 1996 on the legal + protection of databases, and under any national implementation + thereof, including any amended or successor version of such + directive); and +vii. other similar, equivalent or corresponding rights throughout the + world based on applicable law or treaty, and any national + implementations thereof. + +2. Waiver. To the greatest extent permitted by, but not in contravention +of, applicable law, Affirmer hereby overtly, fully, permanently, +irrevocably and unconditionally waives, abandons, and surrenders all of +Affirmer's Copyright and Related Rights and associated claims and causes +of action, whether now known or unknown (including existing as well as +future claims and causes of action), in the Work (i) in all territories +worldwide, (ii) for the maximum duration provided by applicable law or +treaty (including future time extensions), (iii) in any current or future +medium and for any number of copies, and (iv) for any purpose whatsoever, +including without limitation commercial, advertising or promotional +purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each +member of the public at large and to the detriment of Affirmer's heirs and +successors, fully intending that such Waiver shall not be subject to +revocation, rescission, cancellation, termination, or any other legal or +equitable action to disrupt the quiet enjoyment of the Work by the public +as contemplated by Affirmer's express Statement of Purpose. + +3. Public License Fallback. Should any part of the Waiver for any reason +be judged legally invalid or ineffective under applicable law, then the +Waiver shall be preserved to the maximum extent permitted taking into +account Affirmer's express Statement of Purpose. In addition, to the +extent the Waiver is so judged Affirmer hereby grants to each affected +person a royalty-free, non transferable, non sublicensable, non exclusive, +irrevocable and unconditional license to exercise Affirmer's Copyright and +Related Rights in the Work (i) in all territories worldwide, (ii) for the +maximum duration provided by applicable law or treaty (including future +time extensions), (iii) in any current or future medium and for any number +of copies, and (iv) for any purpose whatsoever, including without +limitation commercial, advertising or promotional purposes (the +"License"). The License shall be deemed effective as of the date CC0 was +applied by Affirmer to the Work. Should any part of the License for any +reason be judged legally invalid or ineffective under applicable law, such +partial invalidity or ineffectiveness shall not invalidate the remainder +of the License, and in such case Affirmer hereby affirms that he or she +will not (i) exercise any of his or her remaining Copyright and Related +Rights in the Work or (ii) assert any associated claims and causes of +action with respect to the Work, in either case contrary to Affirmer's +express Statement of Purpose. + +4. Limitations and Disclaimers. + + a. No trademark or patent rights held by Affirmer are waived, abandoned, + surrendered, licensed or otherwise affected by this document. + b. Affirmer offers the Work as-is and makes no representations or + warranties of any kind concerning the Work, express, implied, + statutory or otherwise, including without limitation warranties of + title, merchantability, fitness for a particular purpose, non + infringement, or the absence of latent or other defects, accuracy, or + the present or absence of errors, whether or not discoverable, all to + the greatest extent permissible under applicable law. + c. Affirmer disclaims responsibility for clearing rights of other persons + that may apply to the Work or any use thereof, including without + limitation any person's Copyright and Related Rights in the Work. + Further, Affirmer disclaims responsibility for obtaining any necessary + consents, permissions or other rights required for any use of the + Work. + d. Affirmer understands and acknowledges that Creative Commons is not a + party to this document and has no duty or obligation with respect to + this CC0 or use of the Work. diff --git a/akka-samples/akka-sample-distributed-data-java/LICENSE b/akka-samples/akka-sample-distributed-data-java/LICENSE new file mode 100644 index 0000000000..287f8dd7fa --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/LICENSE @@ -0,0 +1,10 @@ +Activator Template by Typesafe + +Licensed under Public Domain (CC0) + +To the extent possible under law, the person who associated CC0 with +this Activator Tempate has waived all copyright and related or neighboring +rights to this Activator Template. + +You should have received a copy of the CC0 legalcode along with this +work. If not, see . diff --git a/akka-samples/akka-sample-distributed-data-java/activator.properties b/akka-samples/akka-sample-distributed-data-java/activator.properties new file mode 100644 index 0000000000..edde6809fb --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/activator.properties @@ -0,0 +1,7 @@ +name=akka-sample-distributed-data-java +title=Akka Distributed Data Samples with Java +description=Akka Distributed Data Samples with Java +tags=akka,cluster,java,sample,distributed-data +authorName=Akka Team +authorLink=http://akka.io/ +sourceLink=https://github.com/akka/akka diff --git a/akka-samples/akka-sample-distributed-data-java/build.sbt b/akka-samples/akka-sample-distributed-data-java/build.sbt new file mode 100644 index 0000000000..188d820a6b --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/build.sbt @@ -0,0 +1,47 @@ +import com.typesafe.sbt.SbtMultiJvm +import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm + +val akkaVersion = "2.4-SNAPSHOT" + +val project = Project( + id = "akka-sample-distributed-data-java", + base = file("."), + settings = Project.defaultSettings ++ SbtMultiJvm.multiJvmSettings ++ Seq( + name := "akka-sample-distributed-data-java", + version := "2.4-SNAPSHOT", + scalaVersion := "2.11.6", + scalacOptions in Compile ++= Seq("-encoding", "UTF-8", "-target:jvm-1.8", "-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint"), + javacOptions in Compile ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint:unchecked", "-Xlint:deprecation", "-Xdiags:verbose"), + javacOptions in doc in Compile := Seq("-source", "1.8"), + libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-remote" % akkaVersion, + "com.typesafe.akka" %% "akka-cluster" % akkaVersion, + "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion, + "com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion, + "org.scalatest" %% "scalatest" % "2.2.1" % "test"), + javaOptions in run ++= Seq( + "-Xms128m", "-Xmx1024m"), + Keys.fork in run := true, + // make sure that MultiJvm test are compiled by the default test compilation + compile in MultiJvm <<= (compile in MultiJvm) triggeredBy (compile in Test), + // disable parallel tests + parallelExecution in Test := false, + // make sure that MultiJvm tests are executed by the default test target, + // and combine the results from ordinary test and multi-jvm tests + executeTests in Test <<= (executeTests in Test, executeTests in MultiJvm) map { + case (testResults, multiNodeResults) => + val overall = + if (testResults.overall.id < multiNodeResults.overall.id) + multiNodeResults.overall + else + testResults.overall + Tests.Output(overall, + testResults.events ++ multiNodeResults.events, + testResults.summaries ++ multiNodeResults.summaries) + } + ) +) configs (MultiJvm) + + +fork in run := true \ No newline at end of file diff --git a/akka-samples/akka-sample-distributed-data-java/project/build.properties b/akka-samples/akka-sample-distributed-data-java/project/build.properties new file mode 100644 index 0000000000..748703f770 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.7 diff --git a/akka-samples/akka-sample-distributed-data-java/project/plugins.sbt b/akka-samples/akka-sample-distributed-data-java/project/plugins.sbt new file mode 100644 index 0000000000..c3e7d797de --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/project/plugins.sbt @@ -0,0 +1,4 @@ + +resolvers += Classpaths.typesafeResolver + +addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.3.8") diff --git a/akka-samples/akka-sample-distributed-data-java/project/sbt-ui.sbt b/akka-samples/akka-sample-distributed-data-java/project/sbt-ui.sbt new file mode 100644 index 0000000000..7c28b97b34 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/project/sbt-ui.sbt @@ -0,0 +1,3 @@ +// This plugin represents functionality that is to be added to sbt in the future + +addSbtPlugin("org.scala-sbt" % "sbt-core-next" % "0.1.1") \ No newline at end of file diff --git a/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ReplicatedCache.java b/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ReplicatedCache.java new file mode 100644 index 0000000000..e02c8a250f --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ReplicatedCache.java @@ -0,0 +1,163 @@ +package sample.distributeddata; + +import static akka.cluster.ddata.Replicator.readLocal; +import static akka.cluster.ddata.Replicator.writeLocal; + +import java.util.Optional; +import scala.Option; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.cluster.Cluster; +import akka.cluster.ddata.DistributedData; +import akka.cluster.ddata.Key; +import akka.cluster.ddata.LWWMap; +import akka.cluster.ddata.LWWMapKey; +import akka.cluster.ddata.Replicator.Get; +import akka.cluster.ddata.Replicator.GetSuccess; +import akka.cluster.ddata.Replicator.NotFound; +import akka.cluster.ddata.Replicator.Update; +import akka.cluster.ddata.Replicator.UpdateResponse; +import akka.japi.pf.ReceiveBuilder; + +@SuppressWarnings("unchecked") +public class ReplicatedCache extends AbstractActor { + + static class Request { + public final String key; + public final ActorRef replyTo; + + public Request(String key, ActorRef replyTo) { + this.key = key; + this.replyTo = replyTo; + } + } + + public static class PutInCache { + public final String key; + public final Object value; + + public PutInCache(String key, Object value) { + this.key = key; + this.value = value; + } + } + + public static class GetFromCache { + public final String key; + + public GetFromCache(String key) { + this.key = key; + } + } + + public static class Cached { + public final String key; + public final Optional value; + + public Cached(String key, Optional value) { + this.key = key; + this.value = value; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((key == null) ? 0 : key.hashCode()); + result = prime * result + ((value == null) ? 0 : value.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Cached other = (Cached) obj; + if (key == null) { + if (other.key != null) + return false; + } else if (!key.equals(other.key)) + return false; + if (value == null) { + if (other.value != null) + return false; + } else if (!value.equals(other.value)) + return false; + return true; + } + + @Override + public String toString() { + return "Cached [key=" + key + ", value=" + value + "]"; + } + + } + + public static class Evict { + public final String key; + + public Evict(String key) { + this.key = key; + } + } + + public static Props props() { + return Props.create(ReplicatedCache.class); + } + + private final ActorRef replicator = DistributedData.get(context().system()).replicator(); + private final Cluster node = Cluster.get(context().system()); + + public ReplicatedCache() { + receive(ReceiveBuilder + .match(PutInCache.class, cmd -> receivePutInCache(cmd.key, cmd.value)) + .match(Evict.class, cmd -> receiveEvict(cmd.key)) + .match(GetFromCache.class, cmd -> receiveGetFromCache(cmd.key)) + .match(GetSuccess.class, g -> receiveGetSuccess((GetSuccess>) g)) + .match(NotFound.class, n -> receiveNotFound((NotFound>) n)) + .match(UpdateResponse.class, u -> {}) + .build()); + } + + private void receivePutInCache(String key, Object value) { + Update> update = new Update<>(dataKey(key), LWWMap.create(), writeLocal(), + curr -> curr.put(node, key, value)); + replicator.tell(update, self()); + } + + private void receiveEvict(String key) { + Update> update = new Update<>(dataKey(key), LWWMap.create(), writeLocal(), + curr -> curr.remove(node, key)); + replicator.tell(update, self()); + } + + private void receiveGetFromCache(String key) { + Optional ctx = Optional.of(new Request(key, sender())); + Get> get = new Get<>(dataKey(key), readLocal(), ctx); + replicator.tell(get, self()); + } + + private void receiveGetSuccess(GetSuccess> g) { + Request req = (Request) g.getRequest().get(); + Option valueOption = g.dataValue().get(req.key); + Optional valueOptional = Optional.ofNullable(valueOption.isDefined() ? valueOption.get() : null); + req.replyTo.tell(new Cached(req.key, valueOptional), self()); + } + + private void receiveNotFound(NotFound> n) { + Request req = (Request) n.getRequest().get(); + req.replyTo.tell(new Cached(req.key, Optional.empty()), self()); + } + + private Key> dataKey(String entryKey) { + return LWWMapKey.create("cache-" + Math.abs(entryKey.hashCode()) % 100); + } + + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ReplicatedMetrics.java b/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ReplicatedMetrics.java new file mode 100644 index 0000000000..012903ee35 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ReplicatedMetrics.java @@ -0,0 +1,167 @@ +package sample.distributeddata; + +import static akka.cluster.ddata.Replicator.writeLocal; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import scala.concurrent.duration.FiniteDuration; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Address; +import akka.actor.Cancellable; +import akka.actor.Props; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.cluster.ClusterEvent.MemberRemoved; +import akka.cluster.ClusterEvent.MemberUp; +import akka.cluster.ddata.DistributedData; +import akka.cluster.ddata.Key; +import akka.cluster.ddata.LWWMap; +import akka.cluster.ddata.LWWMapKey; +import akka.cluster.ddata.Replicator.Changed; +import akka.cluster.ddata.Replicator.Subscribe; +import akka.cluster.ddata.Replicator.Update; +import akka.cluster.ddata.Replicator.UpdateResponse; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.japi.pf.ReceiveBuilder; + +@SuppressWarnings("unchecked") +public class ReplicatedMetrics extends AbstractActor { + + public static Props props(FiniteDuration measureInterval, FiniteDuration cleanupInterval) { + return Props.create(ReplicatedMetrics.class, measureInterval, cleanupInterval); + } + + public static class UsedHeap { + public Map percentPerNode; + + public UsedHeap(Map percentPerNode) { + this.percentPerNode = percentPerNode; + } + } + + private static final String TICK = "tick"; + private static final String CLEANUP = "cleanup"; + + public static String nodeKey(Address address) { + return address.host().get() + ":" + address.port().get(); + } + + private final ActorRef replicator = DistributedData.get(context().system()).replicator(); + private final Cluster node = Cluster.get(context().system()); + private final String selfNodeKey = nodeKey(node.selfAddress()); + private final MemoryMXBean memoryMBean = ManagementFactory.getMemoryMXBean(); + private final LoggingAdapter log = Logging.getLogger(context().system(), this); + + private final Key> usedHeapKey = LWWMapKey.create("usedHeap"); + private final Key> maxHeapKey = LWWMapKey.create("maxHeap"); + + private final Cancellable tickTask; + private final Cancellable cleanupTask; + + private Map maxHeap = new HashMap<>(); + private final Set nodesInCluster = new HashSet<>(); + + @Override + public void preStart() { + replicator.tell(new Subscribe<>(maxHeapKey, self()), ActorRef.noSender()); + replicator.tell(new Subscribe<>(usedHeapKey, self()), ActorRef.noSender()); + node.subscribe(self(), ClusterEvent.initialStateAsEvents(), + MemberUp.class, MemberRemoved.class); + } + + @Override + public void postStop() throws Exception { + tickTask.cancel(); + cleanupTask.cancel(); + node.unsubscribe(self()); + super.postStop(); + } + + public ReplicatedMetrics(FiniteDuration measureInterval, FiniteDuration cleanupInterval) { + tickTask = context().system().scheduler().schedule(measureInterval, measureInterval, + self(), TICK, context().dispatcher(), self()); + cleanupTask = context().system().scheduler().schedule(cleanupInterval, cleanupInterval, + self(), CLEANUP, context().dispatcher(), self()); + + receive(ReceiveBuilder + .matchEquals(TICK, t -> receiveTick()) + .match(Changed.class, c -> c.key().equals(maxHeapKey), c -> receiveMaxHeapChanged((Changed>) c)) + .match(Changed.class, c -> c.key().equals(usedHeapKey), c -> receiveUsedHeapChanged((Changed>) c)) + .match(UpdateResponse.class, u -> {}) + .match(MemberUp.class, m -> receiveMemberUp(m.member().address())) + .match(MemberRemoved.class, m -> receiveMemberRemoved(m.member().address())) + .matchEquals(CLEANUP, c -> receiveCleanup()) + .build()); + } + + private void receiveTick() { + MemoryUsage heap = memoryMBean.getHeapMemoryUsage(); + long used = heap.getUsed(); + long max = heap.getMax(); + + Update> update1 = new Update<>(usedHeapKey, LWWMap.create(), writeLocal(), + curr -> curr.put(node, selfNodeKey, used)); + replicator.tell(update1, self()); + + Update> update2 = new Update<>(maxHeapKey, LWWMap.create(), writeLocal(), curr -> { + if (curr.contains(selfNodeKey) && curr.get(selfNodeKey).get().longValue() == max) + return curr; // unchanged + else + return curr.put(node, selfNodeKey, max); + }); + replicator.tell(update2, self()); + } + + private void receiveMaxHeapChanged(Changed> c) { + maxHeap = c.dataValue().getEntries(); + } + + private void receiveUsedHeapChanged(Changed> c) { + Map percentPerNode = new HashMap<>(); + for (Map.Entry entry : c.dataValue().getEntries().entrySet()) { + if (maxHeap.containsKey(entry.getKey())) { + double percent = (entry.getValue().doubleValue() / maxHeap.get(entry.getKey())) * 100.0; + percentPerNode.put(entry.getKey(), percent); + } + } + UsedHeap usedHeap = new UsedHeap(percentPerNode); + log.debug("Node {} observed:\n{}", node, usedHeap); + context().system().eventStream().publish(usedHeap); + } + + private void receiveMemberUp(Address address) { + nodesInCluster.add(nodeKey(address)); + } + + private void receiveMemberRemoved(Address address) { + nodesInCluster.remove(nodeKey(address)); + if (address.equals(node.selfAddress())) + context().stop(self()); + } + + private void receiveCleanup() { + Update> update1 = new Update<>(usedHeapKey, LWWMap.create(), writeLocal(), curr -> cleanup(curr)); + replicator.tell(update1, self()); + Update> update2 = new Update<>(maxHeapKey, LWWMap.create(), writeLocal(), curr -> cleanup(curr)); + replicator.tell(update2, self()); + } + + private LWWMap cleanup(LWWMap data) { + LWWMap result = data; + log.info("Cleanup " + nodesInCluster + " -- " + data.getEntries().keySet()); + for (String k : data.getEntries().keySet()) { + if (!nodesInCluster.contains(k)) { + result = result.remove(node, k); + } + } + return result; + } + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ServiceRegistry.java b/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ServiceRegistry.java new file mode 100644 index 0000000000..8d0dd71912 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ServiceRegistry.java @@ -0,0 +1,248 @@ +package sample.distributeddata; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import scala.PartialFunction; +import scala.runtime.BoxedUnit; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Address; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.cluster.ddata.DistributedData; +import akka.cluster.ddata.GSet; +import akka.cluster.ddata.GSetKey; +import akka.cluster.ddata.Key; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.Replicator; +import akka.cluster.ddata.Replicator.Changed; +import akka.cluster.ddata.Replicator.Subscribe; +import akka.cluster.ddata.Replicator.Update; +import akka.cluster.ddata.Replicator.UpdateResponse; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.japi.pf.ReceiveBuilder; + +@SuppressWarnings("unchecked") +public class ServiceRegistry extends AbstractActor { + + /** + * Register a `service` with a `name`. Several services can be registered with + * the same `name`. It will be removed when it is terminated. + */ + public static class Register { + public final String name; + public final ActorRef service; + + public Register(String name, ActorRef service) { + this.name = name; + this.service = service; + } + } + + /** + * Lookup services registered for a `name`. {@link Bindings} will be sent to + * `sender()`. + */ + public static class Lookup { + public final String name; + + public Lookup(String name) { + this.name = name; + } + } + + /** + * Reply for {@link Lookup} + */ + public static class Bindings { + public final String name; + public final Set services; + + public Bindings(String name, Set services) { + this.name = name; + this.services = services; + } + } + + /** + * Published to `ActorSystem.eventStream` when services are changed. + */ + public static class BindingChanged { + public final String name; + public final Set services; + + public BindingChanged(String name, Set services) { + this.name = name; + this.services = services; + } + } + + public static class ServiceKey extends Key> { + private static final long serialVersionUID = 1L; + + public ServiceKey(String serviceName) { + super(serviceName); + } + } + + public static Props props() { + return Props.create(ServiceRegistry.class); + } + + private final LoggingAdapter log = Logging.getLogger(context().system(), this); + private final ActorRef replicator = DistributedData.get(context().system()).replicator(); + private final Cluster node = Cluster.get(context().system()); + + + private final Key> allServicesKey = GSetKey.create("service-keys"); + + private Set keys = new HashSet<>(); + private final Map> services = new HashMap<>(); + private boolean leader = false; + + public ServiceRegistry() { + receive(matchCommands() + .orElse(matchChanged()) + .orElse(matchWatch()) + .orElse(matchOther())); + } + + @Override + public void preStart() { + replicator.tell(new Subscribe<>(allServicesKey, self()), ActorRef.noSender()); + node.subscribe(self(), ClusterEvent.initialStateAsEvents(), ClusterEvent.LeaderChanged.class); + } + + @Override + public void postStop() throws Exception { + node.unsubscribe(self()); + super.postStop(); + } + + private PartialFunction matchCommands() { + return ReceiveBuilder + .match(Register.class, r -> receiveRegister(r)) + .match(Lookup.class, l -> receiveLookup(l)) + .build(); + } + + private ServiceKey serviceKey(String serviceName) { + return new ServiceKey("service:" + serviceName); + } + + + private void receiveRegister(Register r) { + ServiceKey dKey = serviceKey(r.name); + // store the service names in a separate GSet to be able to + // get notifications of new names + if (!keys.contains(dKey)) { + Update> update1 = new Update<>(allServicesKey, GSet.create(), Replicator.writeLocal(), + curr -> curr.add(dKey)); + replicator.tell(update1, self()); + } + + Update> update2 = new Update<>(dKey, ORSet.create(), Replicator.writeLocal(), + curr -> curr.add(node, r.service)); + replicator.tell(update2, self()); + } + + private void receiveLookup(Lookup l) { + sender().tell(new Bindings(l.name, services.getOrDefault(l.name, Collections.emptySet())), self()); + } + + private PartialFunction matchChanged() { + return ReceiveBuilder + .match(Changed.class, c -> { + if (c.key().equals(allServicesKey)) + receiveAllServicesKeysChanged((Changed>) c); + else if (c.key() instanceof ServiceKey) + receiveServiceChanged((Changed>) c); + }) + .build(); + } + + private void receiveAllServicesKeysChanged(Changed> c) { + Set newKeys = c.dataValue().getElements(); + Set diff = new HashSet<>(newKeys); + diff.removeAll(keys); + log.debug("Services changed, added: {}, all: {}", diff, newKeys); + diff.forEach(dKey -> { + // subscribe to get notifications of when services with this name are added or removed + replicator.tell(new Subscribe>(dKey, self()), self()); + }); + keys = newKeys; + + } + + private void receiveServiceChanged(Changed> c) { + String name = c.key().id().split(":")[1]; + Set newServices = c.get(serviceKey(name)).getElements(); + log.debug("Services changed for name [{}]: {}", name, newServices); + services.put(name, newServices); + context().system().eventStream().publish(new BindingChanged(name, newServices)); + if (leader) { + newServices.forEach(ref -> context().watch(ref)); // watch is idempotent + } + } + + private PartialFunction matchWatch() { + return ReceiveBuilder + .match(ClusterEvent.LeaderChanged.class, c -> c.getLeader() != null, + c -> receiveLeaderChanged(c.getLeader())) + .match(Terminated.class, t -> receiveTerminated(t.actor())) + .build(); + } + + private void receiveLeaderChanged(Address newLeader) { + // Let one node (the leader) be responsible for removal of terminated services + // to avoid redundant work and too many death watch notifications. + // It is not critical to only do it from one node. + boolean wasLeader = leader; + leader = newLeader.equals(node.selfAddress()); + // when used with many (> 500) services you must increase the system message buffer + // `akka.remote.system-message-buffer-size` + if (!wasLeader && leader) { + for (Set refs : services.values()) { + for (ActorRef ref : refs) { + context().watch(ref); + } + } + } else if (wasLeader && !leader) { + for (Set refs : services.values()) { + for (ActorRef ref : refs) { + context().unwatch(ref); + } + } + } + } + + private void receiveTerminated(ActorRef ref) { + for (Map.Entry> entry : services.entrySet()) { + if (entry.getValue().contains(ref)) { + log.debug("Service with name [{}] terminated: {}", entry.getKey(), ref); + ServiceKey dKey = serviceKey(entry.getKey()); + Update> update = new Update<>(dKey, ORSet.create(), Replicator.writeLocal(), + curr -> curr.remove(node, ref)); + replicator.tell(update, self()); + } + } + } + + private PartialFunction matchOther() { + return ReceiveBuilder + .match(UpdateResponse.class, u -> { + // ok + }) + .build(); + } + + + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java b/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java new file mode 100644 index 0000000000..827617421f --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java @@ -0,0 +1,277 @@ +package sample.distributeddata; + +import static java.util.concurrent.TimeUnit.SECONDS; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import scala.PartialFunction; +import scala.concurrent.duration.Duration; +import scala.runtime.BoxedUnit; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.cluster.Cluster; +import akka.cluster.ddata.DistributedData; +import akka.cluster.ddata.Key; +import akka.cluster.ddata.LWWMap; +import akka.cluster.ddata.LWWMapKey; +import akka.cluster.ddata.Replicator; +import akka.cluster.ddata.Replicator.GetFailure; +import akka.cluster.ddata.Replicator.GetResponse; +import akka.cluster.ddata.Replicator.GetSuccess; +import akka.cluster.ddata.Replicator.NotFound; +import akka.cluster.ddata.Replicator.ReadConsistency; +import akka.cluster.ddata.Replicator.ReadMajority; +import akka.cluster.ddata.Replicator.Update; +import akka.cluster.ddata.Replicator.UpdateFailure; +import akka.cluster.ddata.Replicator.UpdateSuccess; +import akka.cluster.ddata.Replicator.UpdateTimeout; +import akka.cluster.ddata.Replicator.WriteConsistency; +import akka.cluster.ddata.Replicator.WriteMajority; +import akka.japi.pf.ReceiveBuilder; + +@SuppressWarnings("unchecked") +public class ShoppingCart extends AbstractActor { + + //#read-write-majority + private final WriteConsistency writeMajority = + new WriteMajority(Duration.create(3, SECONDS)); + private final static ReadConsistency readMajority = + new ReadMajority(Duration.create(3, SECONDS)); + //#read-write-majority + + public static final String GET_CART = "getCart"; + + public static class AddItem { + public final LineItem item; + + public AddItem(LineItem item) { + this.item = item; + } + } + + public static class RemoveItem { + public final String productId; + + public RemoveItem(String productId) { + this.productId = productId; + } + } + + public static class Cart { + public final Set items; + + public Cart(Set items) { + this.items = items; + } + } + + public static class LineItem implements Serializable { + private static final long serialVersionUID = 1L; + public final String productId; + public final String title; + public final int quantity; + + public LineItem(String productId, String title, int quantity) { + this.productId = productId; + this.title = title; + this.quantity = quantity; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((productId == null) ? 0 : productId.hashCode()); + result = prime * result + quantity; + result = prime * result + ((title == null) ? 0 : title.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + LineItem other = (LineItem) obj; + if (productId == null) { + if (other.productId != null) + return false; + } else if (!productId.equals(other.productId)) + return false; + if (quantity != other.quantity) + return false; + if (title == null) { + if (other.title != null) + return false; + } else if (!title.equals(other.title)) + return false; + return true; + } + + @Override + public String toString() { + return "LineItem [productId=" + productId + ", title=" + title + ", quantity=" + quantity + "]"; + } + + } + + public static Props props(String userId) { + return Props.create(ShoppingCart.class, userId); + } + + private final ActorRef replicator = DistributedData.get(context().system()).replicator(); + private final Cluster node = Cluster.get(context().system()); + + @SuppressWarnings("unused") + private final String userId; + private final Key> dataKey; + + public ShoppingCart(String userId) { + this.userId = userId; + this.dataKey = LWWMapKey.create("cart-" + userId); + + receive(matchGetCart() + .orElse(matchAddItem()) + .orElse(matchRemoveItem()) + .orElse(matchOther())); + } + + + //#get-cart + private PartialFunction matchGetCart() { + return ReceiveBuilder + .matchEquals((GET_CART), + s -> receiveGetCart()) + .match(GetSuccess.class, g -> isResponseToGetCart(g), + g -> receiveGetSuccess((GetSuccess>) g)) + .match(NotFound.class, n -> isResponseToGetCart(n), + n -> receiveNotFound((NotFound>) n)) + .match(GetFailure.class, f -> isResponseToGetCart(f), + f -> receiveGetFailure((GetFailure>) f)) + .build(); + } + + + private void receiveGetCart() { + Optional ctx = Optional.of(sender()); + replicator.tell(new Replicator.Get>(dataKey, readMajority, ctx), + self()); + } + + private boolean isResponseToGetCart(GetResponse response) { + return response.key().equals(dataKey) && + (response.getRequest().orElse(null) instanceof ActorRef); + } + + private void receiveGetSuccess(GetSuccess> g) { + Set items = new HashSet<>(g.dataValue().getEntries().values()); + ActorRef replyTo = (ActorRef) g.getRequest().get(); + replyTo.tell(new Cart(items), self()); + } + + private void receiveNotFound(NotFound> n) { + ActorRef replyTo = (ActorRef) n.getRequest().get(); + replyTo.tell(new Cart(new HashSet<>()), self()); + } + + private void receiveGetFailure(GetFailure> f) { + // ReadMajority failure, try again with local read + Optional ctx = Optional.of(sender()); + replicator.tell(new Replicator.Get>(dataKey, Replicator.readLocal(), + ctx), self()); + } + //#get-cart + + //#add-item + private PartialFunction matchAddItem() { + return ReceiveBuilder + .match(AddItem.class, r -> receiveAddItem(r)) + .build(); + } + + private void receiveAddItem(AddItem add) { + Update> update = new Update<>(dataKey, LWWMap.create(), writeMajority, + cart -> updateCart(cart, add.item)); + replicator.tell(update, self()); + } + + //#add-item + + private LWWMap updateCart(LWWMap data, LineItem item) { + if (data.contains(item.productId)) { + LineItem existingItem = data.get(item.productId).get(); + int newQuantity = existingItem.quantity + item.quantity; + LineItem newItem = new LineItem(item.productId, item.title, newQuantity); + return data.put(node, item.productId, newItem); + } else { + return data.put(node, item.productId, item); + } + } + + private PartialFunction matchRemoveItem() { + return ReceiveBuilder + .match(RemoveItem.class, r -> receiveRemoveItem(r)) + .match(GetSuccess.class, g -> isResponseToRemoveItem(g), + g -> receiveRemoveItemGetSuccess((GetSuccess>) g)) + .match(GetFailure.class, f -> isResponseToRemoveItem(f), + f -> receiveRemoveItemGetFailure((GetFailure>) f)) + .match(NotFound.class, n -> isResponseToRemoveItem(n), n -> {/* nothing to remove */}) + .build(); + } + + //#remove-item + private void receiveRemoveItem(RemoveItem rm) { + // Try to fetch latest from a majority of nodes first, since ORMap + // remove must have seen the item to be able to remove it. + Optional ctx = Optional.of(rm); + replicator.tell(new Replicator.Get>(dataKey, readMajority, ctx), + self()); + } + + private void receiveRemoveItemGetSuccess(GetSuccess> g) { + RemoveItem rm = (RemoveItem) g.getRequest().get(); + removeItem(rm.productId); + } + + + private void receiveRemoveItemGetFailure(GetFailure> f) { + // ReadMajority failed, fall back to best effort local value + RemoveItem rm = (RemoveItem) f.getRequest().get(); + removeItem(rm.productId); + } + + private void removeItem(String productId) { + Update> update = new Update<>(dataKey, LWWMap.create(), writeMajority, + cart -> cart.remove(node, productId)); + replicator.tell(update, self()); + } + + private boolean isResponseToRemoveItem(GetResponse response) { + return response.key().equals(dataKey) && + (response.getRequest().orElse(null) instanceof RemoveItem); + } + //#remove-item + + private PartialFunction matchOther() { + return ReceiveBuilder + .match(UpdateSuccess.class, u -> { + // ok + }) + .match(UpdateTimeout.class, t -> { + // will eventually be replicated + }) + .match(UpdateFailure.class, f -> { + throw new IllegalStateException("Unexpected failure: " + f); + }) + .build(); + } + + + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/VotingService.java b/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/VotingService.java new file mode 100644 index 0000000000..7e4c6497e7 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/VotingService.java @@ -0,0 +1,149 @@ +package sample.distributeddata; + +import java.util.Optional; +import java.util.HashMap; +import java.math.BigInteger; +import java.util.Map; +import scala.PartialFunction; +import scala.runtime.BoxedUnit; +import scala.concurrent.duration.Duration; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.cluster.Cluster; +import akka.cluster.ddata.*; +import akka.japi.pf.ReceiveBuilder; + +import static akka.cluster.ddata.Replicator.*; +import static java.util.concurrent.TimeUnit.SECONDS; + +@SuppressWarnings("unchecked") +public class VotingService extends AbstractActor { + + public static final String OPEN = "open"; + public static final String CLOSE = "close"; + public static final String GET_VOTES = "getVotes"; + + public static class Votes { + public final Map result; + public final boolean open; + + public Votes(Map result, boolean open) { + this.result = result; + this.open = open; + } + } + + public static class Vote { + public final String participant; + + public Vote(String participant) { + this.participant = participant; + } + } + + private final ActorRef replicator = DistributedData.get(context().system()).replicator(); + private final Cluster node = Cluster.get(context().system()); + + private final Key openedKey = FlagKey.create("contestOpened"); + private final Key closedKey = FlagKey.create("contestClosed"); + private final Key countersKey = PNCounterMapKey.create("contestCounters"); + private final WriteConsistency writeAll = new WriteAll(Duration.create(5, SECONDS)); + private final ReadConsistency readAll = new ReadAll(Duration.create(3, SECONDS)); + + @Override + public void preStart() { + replicator.tell(new Subscribe<>(openedKey, self()), ActorRef.noSender()); + } + + public VotingService() { + receive(ReceiveBuilder + .matchEquals(OPEN, cmd -> receiveOpen()) + .match(Changed.class, c -> c.key().equals(openedKey), c -> receiveOpenedChanged((Changed) c)) + .matchEquals(GET_VOTES, cmd -> receiveGetVotesEmpty()) + .build()); + } + + + private void receiveOpen() { + Update update = new Update<>(openedKey, Flag.create(), writeAll, curr -> curr.switchOn()); + replicator.tell(update, self()); + becomeOpen(); + } + + private void becomeOpen() { + replicator.tell(new Unsubscribe<>(openedKey, self()), ActorRef.noSender()); + replicator.tell(new Subscribe<>(closedKey, self()), ActorRef.noSender()); + context().become(matchOpen().orElse(matchGetVotes(true))); + } + + private void receiveOpenedChanged(Changed c) { + if (c.dataValue().enabled()) + becomeOpen(); + } + + private void receiveGetVotesEmpty() { + sender().tell(new Votes(new HashMap<>(), false), self()); + } + + private PartialFunction matchOpen() { + return ReceiveBuilder + .match(Vote.class, vote -> receiveVote(vote)) + .match(UpdateSuccess.class, u -> receiveUpdateSuccess()) + .matchEquals(CLOSE, cmd -> receiveClose()) + .match(Changed.class, c -> c.key().equals(closedKey), c -> receiveClosedChanged((Changed) c)) + .build(); + } + + private void receiveVote(Vote vote) { + Update update = new Update<>(countersKey, PNCounterMap.create(), Replicator.writeLocal(), + curr -> curr.increment(node, vote.participant, 1)); + replicator.tell(update, self()); + } + + private void receiveUpdateSuccess() { + // ok + } + + private void receiveClose() { + Update update = new Update<>(closedKey, Flag.create(), writeAll, curr -> curr.switchOn()); + replicator.tell(update, self()); + context().become(matchGetVotes(false)); + } + + private void receiveClosedChanged(Changed c) { + if (c.dataValue().enabled()) + context().become(matchGetVotes(false)); + } + + private PartialFunction matchGetVotes(boolean open) { + return ReceiveBuilder + .matchEquals(GET_VOTES, s -> receiveGetVotes()) + .match(NotFound.class, n -> n.key().equals(countersKey), n -> receiveNotFound(open, (NotFound) n)) + .match(GetSuccess.class, g -> g.key().equals(countersKey), + g -> receiveGetSuccess(open, (GetSuccess) g)) + .match(GetFailure.class, f -> f.key().equals(countersKey), f -> receiveGetFailure()) + .match(UpdateSuccess.class, u -> receiveUpdateSuccess()).build(); + } + + private void receiveGetVotes() { + Optional ctx = Optional.of(sender()); + replicator.tell(new Replicator.Get(countersKey, readAll, ctx), self()); + } + + + private void receiveGetSuccess(boolean open, GetSuccess g) { + Map result = g.dataValue().getEntries(); + ActorRef replyTo = (ActorRef) g.getRequest().get(); + replyTo.tell(new Votes(result, open), self()); + } + + private void receiveNotFound(boolean open, NotFound n) { + ActorRef replyTo = (ActorRef) n.getRequest().get(); + replyTo.tell(new Votes(new HashMap<>(), open), self()); + } + + private void receiveGetFailure() { + // skip + } +} \ No newline at end of file diff --git a/akka-samples/akka-sample-distributed-data-java/src/main/resources/application.conf b/akka-samples/akka-sample-distributed-data-java/src/main/resources/application.conf new file mode 100644 index 0000000000..5bdd1d8522 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/src/main/resources/application.conf @@ -0,0 +1,21 @@ +akka { + actor { + provider = "akka.cluster.ClusterActorRefProvider" + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } + + cluster { + seed-nodes = [ + "akka.tcp://ClusterSystem@127.0.0.1:2551", + "akka.tcp://ClusterSystem@127.0.0.1:2552"] + + auto-down-unreachable-after = 10s + } +} + diff --git a/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala b/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala new file mode 100644 index 0000000000..0af053ad2d --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala @@ -0,0 +1,135 @@ +package sample.distributeddata + +import java.util.Optional; +import scala.concurrent.duration._ +import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.Replicator.GetReplicaCount +import akka.cluster.ddata.Replicator.ReplicaCount +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object ReplicatedCacheSpec extends MultiNodeConfig { + val node1 = role("node-1") + val node2 = role("node-2") + val node3 = role("node-3") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.log-dead-letters-during-shutdown = off + """)) + +} + +class ReplicatedCacheSpecMultiJvmNode1 extends ReplicatedCacheSpec +class ReplicatedCacheSpecMultiJvmNode2 extends ReplicatedCacheSpec +class ReplicatedCacheSpecMultiJvmNode3 extends ReplicatedCacheSpec + +class ReplicatedCacheSpec extends MultiNodeSpec(ReplicatedCacheSpec) with STMultiNodeSpec with ImplicitSender { + import ReplicatedCacheSpec._ + import ReplicatedCache._ + + override def initialParticipants = roles.size + + val cluster = Cluster(system) + val replicatedCache = system.actorOf(ReplicatedCache.props) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + } + enterBarrier(from.name + "-joined") + } + + "Demo of a replicated cache" must { + "join cluster" in within(10.seconds) { + join(node1, node1) + join(node2, node1) + join(node3, node1) + + awaitAssert { + DistributedData(system).replicator ! GetReplicaCount + expectMsg(ReplicaCount(roles.size)) + } + enterBarrier("after-1") + } + + "replicate cached entry" in within(10.seconds) { + runOn(node1) { + replicatedCache ! new PutInCache("key1", "A") + } + + awaitAssert { + val probe = TestProbe() + replicatedCache.tell(new GetFromCache("key1"), probe.ref) + probe.expectMsg(new Cached("key1", Optional.of("A"))) + } + + enterBarrier("after-2") + } + + "replicate many cached entries" in within(10.seconds) { + runOn(node1) { + for (i ← 100 to 200) + replicatedCache ! new PutInCache("key" + i, i) + } + + awaitAssert { + val probe = TestProbe() + for (i ← 100 to 200) { + replicatedCache.tell(new GetFromCache("key" + i), probe.ref) + probe.expectMsg(new Cached("key" + i, Optional.of(Integer.valueOf(i)))) + } + } + + enterBarrier("after-3") + } + + "replicate evicted entry" in within(15.seconds) { + runOn(node1) { + replicatedCache ! new PutInCache("key2", "B") + } + + awaitAssert { + val probe = TestProbe() + replicatedCache.tell(new GetFromCache("key2"), probe.ref) + probe.expectMsg(new Cached("key2", Optional.of("B"))) + } + enterBarrier("key2-replicated") + + runOn(node3) { + replicatedCache ! new Evict("key2") + } + + awaitAssert { + val probe = TestProbe() + replicatedCache.tell(new GetFromCache("key2"), probe.ref) + probe.expectMsg(new Cached("key2", Optional.empty())) + } + + enterBarrier("after-4") + } + + "replicate updated cached entry" in within(10.seconds) { + runOn(node2) { + replicatedCache ! new PutInCache("key1", "A2") + replicatedCache ! new PutInCache("key1", "A3") + } + + awaitAssert { + val probe = TestProbe() + replicatedCache.tell(new GetFromCache("key1"), probe.ref) + probe.expectMsg(new Cached("key1", Optional.of("A3"))) + } + + enterBarrier("after-5") + } + + } + +} + diff --git a/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala b/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala new file mode 100644 index 0000000000..afca412c71 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala @@ -0,0 +1,92 @@ +package sample.distributeddata + +import scala.concurrent.duration._ +import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.Replicator.GetReplicaCount +import akka.cluster.ddata.Replicator.ReplicaCount +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import scala.collection.JavaConverters._ + +object ReplicatedMetricsSpec extends MultiNodeConfig { + val node1 = role("node-1") + val node2 = role("node-2") + val node3 = role("node-3") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.log-dead-letters-during-shutdown = off + """)) + +} + +class ReplicatedMetricsSpecMultiJvmNode1 extends ReplicatedMetricsSpec +class ReplicatedMetricsSpecMultiJvmNode2 extends ReplicatedMetricsSpec +class ReplicatedMetricsSpecMultiJvmNode3 extends ReplicatedMetricsSpec + +class ReplicatedMetricsSpec extends MultiNodeSpec(ReplicatedMetricsSpec) with STMultiNodeSpec with ImplicitSender { + import ReplicatedMetricsSpec._ + import ReplicatedMetrics._ + + override def initialParticipants = roles.size + + val cluster = Cluster(system) + val replicatedMetrics = system.actorOf(ReplicatedMetrics.props(1.second, 3.seconds)) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + } + enterBarrier(from.name + "-joined") + } + + "Demo of a replicated metrics" must { + "join cluster" in within(10.seconds) { + join(node1, node1) + join(node2, node1) + join(node3, node1) + + awaitAssert { + DistributedData(system).replicator ! GetReplicaCount + expectMsg(ReplicaCount(roles.size)) + } + enterBarrier("after-1") + } + + "replicate metrics" in within(10.seconds) { + val probe = TestProbe() + system.eventStream.subscribe(probe.ref, classOf[UsedHeap]) + awaitAssert { + probe.expectMsgType[UsedHeap](1.second).percentPerNode.size should be(3) + } + probe.expectMsgType[UsedHeap].percentPerNode.size should be(3) + probe.expectMsgType[UsedHeap].percentPerNode.size should be(3) + enterBarrier("after-2") + } + + "cleanup removed node" in within(25.seconds) { + val node3Address = node(node3).address + runOn(node1) { + cluster.leave(node3Address) + } + runOn(node1, node2) { + val probe = TestProbe() + system.eventStream.subscribe(probe.ref, classOf[UsedHeap]) + awaitAssert { + probe.expectMsgType[UsedHeap](1.second).percentPerNode.size should be(2) + } + probe.expectMsgType[UsedHeap].percentPerNode.asScala.toMap should not contain ( + nodeKey(node3Address)) + } + enterBarrier("after-3") + } + + } + +} + diff --git a/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala b/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala new file mode 100644 index 0000000000..0daad1df58 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala @@ -0,0 +1,17 @@ +package sample.distributeddata + +import akka.remote.testkit.MultiNodeSpecCallbacks + +import org.scalatest.{ BeforeAndAfterAll, WordSpecLike } +import org.scalatest.Matchers + +/** + * Hooks up MultiNodeSpec with ScalaTest + */ +trait STMultiNodeSpec extends MultiNodeSpecCallbacks + with WordSpecLike with Matchers with BeforeAndAfterAll { + + override def beforeAll() = multiNodeSpecBeforeAll() + + override def afterAll() = multiNodeSpecAfterAll() +} diff --git a/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ServiceRegistrySpec.scala b/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ServiceRegistrySpec.scala new file mode 100644 index 0000000000..956eb92744 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ServiceRegistrySpec.scala @@ -0,0 +1,142 @@ +package sample.distributeddata + +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.PoisonPill +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.Replicator.GetReplicaCount +import akka.cluster.ddata.Replicator.ReplicaCount +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import scala.collection.JavaConverters._ + +object ServiceRegistrySpec extends MultiNodeConfig { + val node1 = role("node-1") + val node2 = role("node-2") + val node3 = role("node-3") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.log-dead-letters-during-shutdown = off + """)) + + class Service extends Actor { + def receive = { + case s: String ⇒ sender() ! self.path.name + ": " + s + } + } + +} + +class ServiceRegistrySpecMultiJvmNode1 extends ServiceRegistrySpec +class ServiceRegistrySpecMultiJvmNode2 extends ServiceRegistrySpec +class ServiceRegistrySpecMultiJvmNode3 extends ServiceRegistrySpec + +class ServiceRegistrySpec extends MultiNodeSpec(ServiceRegistrySpec) with STMultiNodeSpec with ImplicitSender { + import ServiceRegistrySpec._ + import ServiceRegistry._ + + override def initialParticipants = roles.size + + val cluster = Cluster(system) + val registry = system.actorOf(ServiceRegistry.props) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + } + enterBarrier(from.name + "-joined") + } + + "Demo of a replicated service registry" must { + "join cluster" in within(10.seconds) { + join(node1, node1) + join(node2, node1) + join(node3, node1) + + awaitAssert { + DistributedData(system).replicator ! GetReplicaCount + expectMsg(ReplicaCount(roles.size)) + } + enterBarrier("after-1") + } + + "replicate service entry" in within(10.seconds) { + runOn(node1) { + val a1 = system.actorOf(Props[Service], name = "a1") + registry ! new Register("a", a1) + } + + awaitAssert { + val probe = TestProbe() + registry.tell(new Lookup("a"), probe.ref) + probe.expectMsgType[Bindings].services.asScala.map(_.path.name).toSet should be(Set("a1")) + } + + enterBarrier("after-2") + } + + "replicate updated service entry, and publish to even bus" in { + val probe = TestProbe() + system.eventStream.subscribe(probe.ref, classOf[BindingChanged]) + + runOn(node2) { + val a2 = system.actorOf(Props[Service], name = "a2") + registry ! new Register("a", a2) + } + + probe.within(10.seconds) { + probe.expectMsgType[BindingChanged].services.asScala.map(_.path.name).toSet should be(Set("a1", "a2")) + registry.tell(new Lookup("a"), probe.ref) + probe.expectMsgType[Bindings].services.asScala.map(_.path.name).toSet should be(Set("a1", "a2")) + } + + enterBarrier("after-4") + } + + "remove terminated service" in { + val probe = TestProbe() + system.eventStream.subscribe(probe.ref, classOf[BindingChanged]) + + runOn(node2) { + registry.tell(new Lookup("a"), probe.ref) + val a2 = probe.expectMsgType[Bindings].services.asScala.find(_.path.name == "a2").get + a2 ! PoisonPill + } + + probe.within(10.seconds) { + probe.expectMsgType[BindingChanged].services.asScala.map(_.path.name).toSet should be(Set("a1")) + registry.tell(new Lookup("a"), probe.ref) + probe.expectMsgType[Bindings].services.asScala.map(_.path.name).toSet should be(Set("a1")) + } + + enterBarrier("after-5") + } + + "replicate many service entries" in within(10.seconds) { + for (i ← 100 until 200) { + val service = system.actorOf(Props[Service], name = myself.name + "_" + i) + registry ! new Register("a" + i, service) + } + + awaitAssert { + val probe = TestProbe() + for (i ← 100 until 200) { + registry.tell(new Lookup("a" + i), probe.ref) + probe.expectMsgType[Bindings].services.asScala.map(_.path.name).toSet should be(roles.map(_.name + "_" + i).toSet) + } + } + + enterBarrier("after-6") + } + + } + +} + diff --git a/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala b/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala new file mode 100644 index 0000000000..f796c516e7 --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala @@ -0,0 +1,101 @@ +package sample.distributeddata + +import scala.concurrent.duration._ +import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.Replicator.GetReplicaCount +import akka.cluster.ddata.Replicator.ReplicaCount +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import scala.collection.JavaConverters._ + +object ShoppingCartSpec extends MultiNodeConfig { + val node1 = role("node-1") + val node2 = role("node-2") + val node3 = role("node-3") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.log-dead-letters-during-shutdown = off + """)) + +} + +class ShoppingCartSpecMultiJvmNode1 extends ShoppingCartSpec +class ShoppingCartSpecMultiJvmNode2 extends ShoppingCartSpec +class ShoppingCartSpecMultiJvmNode3 extends ShoppingCartSpec + +class ShoppingCartSpec extends MultiNodeSpec(ShoppingCartSpec) with STMultiNodeSpec with ImplicitSender { + import ShoppingCartSpec._ + import ShoppingCart._ + + override def initialParticipants = roles.size + + val cluster = Cluster(system) + val shoppingCart = system.actorOf(ShoppingCart.props("user-1")) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + } + enterBarrier(from.name + "-joined") + } + + "Demo of a replicated shopping cart" must { + "join cluster" in within(10.seconds) { + join(node1, node1) + join(node2, node1) + join(node3, node1) + + awaitAssert { + DistributedData(system).replicator ! GetReplicaCount + expectMsg(ReplicaCount(roles.size)) + } + enterBarrier("after-1") + } + + "handle updates directly after start" in within(15.seconds) { + runOn(node2) { + shoppingCart ! new ShoppingCart.AddItem(new LineItem("1", "Apples", 2)) + shoppingCart ! new ShoppingCart.AddItem(new LineItem("2", "Oranges", 3)) + } + enterBarrier("updates-done") + + awaitAssert { + shoppingCart ! ShoppingCart.GET_CART + val cart = expectMsgType[Cart] + cart.items.asScala.toSet should be(Set( + new LineItem("1", "Apples", 2), new LineItem("2", "Oranges", 3))) + } + + enterBarrier("after-2") + } + + "handle updates from different nodes" in within(5.seconds) { + runOn(node2) { + shoppingCart ! new ShoppingCart.AddItem(new LineItem("1", "Apples", 5)) + shoppingCart ! new ShoppingCart.RemoveItem("2") + } + runOn(node3) { + shoppingCart ! new ShoppingCart.AddItem(new LineItem("3", "Bananas", 4)) + } + enterBarrier("updates-done") + + awaitAssert { + shoppingCart ! ShoppingCart.GET_CART + val cart = expectMsgType[Cart] + cart.items.asScala.toSet should be( + Set(new LineItem("1", "Apples", 7), new LineItem("3", "Bananas", 4))) + } + + enterBarrier("after-3") + } + + } + +} + diff --git a/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala b/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala new file mode 100644 index 0000000000..9b8b9501fc --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala @@ -0,0 +1,101 @@ +package sample.distributeddata + +import java.math.BigInteger +import scala.concurrent.duration._ +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.Replicator.GetReplicaCount +import akka.cluster.ddata.Replicator.ReplicaCount +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object VotingServiceSpec extends MultiNodeConfig { + val node1 = role("node-1") + val node2 = role("node-2") + val node3 = role("node-3") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.log-dead-letters-during-shutdown = off + """)) + +} + +class VotingServiceSpecMultiJvmNode1 extends VotingServiceSpec +class VotingServiceSpecMultiJvmNode2 extends VotingServiceSpec +class VotingServiceSpecMultiJvmNode3 extends VotingServiceSpec + +class VotingServiceSpec extends MultiNodeSpec(VotingServiceSpec) with STMultiNodeSpec with ImplicitSender { + import VotingServiceSpec._ + + override def initialParticipants = roles.size + + val cluster = Cluster(system) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + } + enterBarrier(from.name + "-joined") + } + + "Demo of a replicated voting" must { + + "join cluster" in within(10.seconds) { + join(node1, node1) + join(node2, node1) + join(node3, node1) + + awaitAssert { + DistributedData(system).replicator ! GetReplicaCount + expectMsg(ReplicaCount(roles.size)) + } + enterBarrier("after-1") + } + + "count votes correctly" in within(15.seconds) { + import VotingService._ + val votingService = system.actorOf(Props[VotingService], "votingService") + val N = 1000 + runOn(node1) { + votingService ! VotingService.OPEN + for (n ← 1 to N) { + votingService ! new Vote("#" + ((n % 20) + 1)) + } + } + runOn(node2, node3) { + // wait for it to open + val p = TestProbe() + awaitAssert { + votingService.tell(VotingService.GET_VOTES, p.ref) + p.expectMsgType[Votes](3.seconds).open should be(true) + } + for (n ← 1 to N) { + votingService ! new Vote("#" + ((n % 20) + 1)) + } + } + enterBarrier("voting-done") + runOn(node3) { + votingService ! VotingService.CLOSE + } + + val expected = (1 to 20).map(n ⇒ "#" + n -> BigInteger.valueOf(3L * N / 20)).toMap + awaitAssert { + votingService ! VotingService.GET_VOTES + val votes = expectMsgType[Votes](3.seconds) + votes.open should be (false) + import scala.collection.JavaConverters._ + votes.result.asScala.toMap should be (expected) + } + + enterBarrier("after-2") + } + } + +} + diff --git a/akka-samples/akka-sample-distributed-data-java/tutorial/index.html b/akka-samples/akka-sample-distributed-data-java/tutorial/index.html new file mode 100644 index 0000000000..d2e9da6d7a --- /dev/null +++ b/akka-samples/akka-sample-distributed-data-java/tutorial/index.html @@ -0,0 +1,306 @@ + + +Akka Distributed Data Samples with Java + + + + +
+

+This tutorial contains 5 samples illustrating how to use +Akka Distributed Data. +

+
    +
  • Low Latency Voting Service
  • +
  • Highly Available Shopping Cart
  • +
  • Distributed Service Registry
  • +
  • Replicated Cache
  • +
  • Replicated Metrics
  • +
+ +

+Akka Distributed Data is useful when you need to share data between nodes in an +Akka Cluster. The data is accessed with an actor providing a key-value store like API. +The keys are unique identifiers with type information of the data values. The values +are Conflict Free Replicated Data Types (CRDTs). +

+ +

+All data entries are spread to all nodes, or nodes with a certain role, in the cluster +via direct replication and gossip based dissemination. You have fine grained control +of the consistency level for reads and writes. +

+ +

+The nature CRDTs makes it possible to perform updates from any node without coordination. +Concurrent updates from different nodes will automatically be resolved by the monotonic +merge function, which all data types must provide. The state changes always converge. +Several useful data types for counters, sets, maps and registers are provided and +you can also implement your own custom data types. +

+ +

+It is eventually consistent and geared toward providing high read and write availability +(partition tolerance), with low latency. Note that in an eventually consistent system a read may return an +out-of-date value. +

+ +

+Note that there are some +Limitations +that you should be aware of. For example, Akka Distributed Data is not intended for Big Data. +

+ +
+ +
+ +

Low Latency Voting Service

+ +

+Distributed Data is great for low latency services, since you can update or get data from the local replica +without immediate communication with other nodes. +

+ +

+Open VotingService.java. +

+ +

+VotingService is an actor for low latency counting of votes on several cluster nodes and aggregation +of the grand total number of votes. The actor is started on each cluster node. First it expects an +OPEN message on one or several nodes. After that the counting can begin. The open +signal is immediately replicated to all nodes with a boolean +Flag. +Note writeAll. +

+ +

+Update<Flag> update = new Update<>(openedKey, Flag.create(), writeAll, curr -> curr.switchOn());
+
+ +

+The actor is subscribing to changes of the OpenedKey and other instances of this actor, +also on other nodes, will be notified when the flag is changed. +

+ +

+replicator.tell(new Subscribe<>(openedKey, self()), ActorRef.noSender());
+
+ +

+.match(Changed.class, c -> c.key().equals(openedKey), c -> receiveOpenedChanged((Changed<Flag>) c))
+
+ +

+The counters are kept in a +PNCounterMap +and updated with: +

+ +

+Update<PNCounterMap> update = new Update<>(countersKey, PNCounterMap.create(), Replicator.writeLocal(),
+        curr -> curr.increment(node, vote.participant, 1));
+ replicator.tell(update, self());
+
+ +

+Incrementing the counter is very fast, since it only involves communication with the local +Replicator actor. Note writeLocal. Those updates are also spread +to other nodes, but that is performed in the background. +

+ +

+The total number of votes is retrieved with: +

+ +

+Optional<Object> ctx = Optional.of(sender());
+replicator.tell(new Replicator.Get<PNCounterMap>(countersKey, readAll, ctx), self());
+
+ +

+.match(GetSuccess.class, g -> g.key().equals(countersKey),
+   g -> receiveGetSuccess(open, (GetSuccess<PNCounterMap>) g))
+
+ +

+private void receiveGetSuccess(boolean open, GetSuccess<PNCounterMap> g) {
+  Map<String, BigInteger> result = g.dataValue().getEntries();
+  ActorRef replyTo = (ActorRef) g.getRequest().get();
+  replyTo.tell(new Votes(result, open), self());
+}
+
+ +

+The multi-node test for the VotingService can be found in +VotingServiceSpec.scala. +

+ +

+Read the +Using the Replicator +documentation for more details of how to use Get, Update, and Subscribe. +

+ +
+ +
+

Highly Available Shopping Cart

+ +

+Distributed Data is great for highly available services, since it is possible to perform +updates to the local node (or currently available nodes) during a network partition. +

+ +

+Open ShoppingCart.java. +

+ +

+ShoppingCart is an actor that holds the selected items to buy for a user. +The actor instance for a specific user may be started where ever needed in the cluster, i.e. several +instances may be started on different nodes and used at the same time. +

+ +

+Each product in the cart is represented by a LineItem and all items in the cart +is collected in a LWWMap. +

+ +

+The actor handles the commands GET_CART, AddItem and RemoveItem. +To get the latest updates in case the same shopping cart is used from several nodes it is using +consistency level of readMajority and writeMajority, but that is only +done to reduce the risk of seeing old data. If such reads and writes cannot be completed due to a +network partition it falls back to reading/writing from the local replica (see GetFailure). +Local reads and writes will always be successful and when the network partition heals the updated +shopping carts will be be disseminated by the +gossip protocol +and the LWWMap CRDTs are merged, i.e. it is a highly available shopping cart. +

+ +

+The multi-node test for the ShoppingCart can be found in +ShoppingCartSpec.scala. +

+ +

+Read the +Consistency +section in the documentation to understand the consistency considerations. +

+ +
+ +
+

Distributed Service Registry

+ +

+Have you ever had the need to lookup actors by name in an Akka Cluster? +This example illustrates how you could implement such a registry. It is probably not +feature complete, but should be a good starting point. +

+ +

+Open ServiceRegistry.java. +

+ +

+ServiceRegistry is an actor that is started on each node in the cluster. +It supports two basic commands: +

+
    +
  • Register to bind an ActorRef to a name, + several actors can be bound to the same name
  • +
  • Lookup get currently bound services of a given name
  • +
+ +

+For each named service it is using an +ORSet. +Here we are using top level ORSet entries. An alternative would have been to use a +ORMultiMap holding all services. That would have a disadvantage if we have many services. +When a data entry is changed the full state of that entry is replicated to other nodes, i.e. when you +update a map the whole map is replicated. +

+ +

+The ServiceRegistry is subscribing to changes of a GSet where we add +the names of all services. It is also subscribing to all such service keys to get notifications when +actors are added or removed to a named service. +

+ +

+The multi-node test for the ServiceRegistry can be found in +ServiceRegistrySpec.scala. +

+ +
+ +
+

Replicated Cache

+ +

+This example illustrates a simple key-value cache. +

+ +

+Open ReplicatedCache.scala. +

+ +

+ReplicatedCache is an actor that is started on each node in the cluster. +It supports three commands: PutInCache, GetFromCache and Evict. +

+ +

+It is splitting up the key space in 100 top level keys, each with a LWWMap. +When a data entry is changed the full state of that entry is replicated to other nodes, i.e. when you +update a map the whole map is replicated. Therefore, instead of using one ORMap with 1000 elements it +is more efficient to split that up in 100 top level ORMap entries with 10 elements each. Top level +entries are replicated individually, which has the trade-off that different entries may not be +replicated at the same time and you may see inconsistencies between related entries. +Separate top level entries cannot be updated atomically together. +

+ +

+The multi-node test for the ReplicatedCache can be found in +ReplicatedCacheSpec.scala. +

+ +
+ +
+

Replicated Metrics

+ +

+This example illustrates to spread metrics data to all nodes in an Akka cluster. +

+ +

+Open ReplicatedMetrics.java. +

+ +

+ReplicatedMetrics is an actor that is started on each node in the cluster. +Periodically it collects some metrics, in this case used and max heap size. +Each metrics type is stored in a LWWMap where the key in the map is the address of +the node. The values are disseminated to other nodes with the gossip protocol. +

+ +

+The multi-node test for the ReplicatedCache can be found in +ReplicatedMetricsSpec.scala. +

+ +

+Note that there are some +Limitations +that you should be aware of. For example, Akka Distributed Data is not intended for Big Data. +

+ +
+ + + diff --git a/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedMetrics.scala b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedMetrics.scala index f4c0b140d9..ce056545ef 100644 --- a/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedMetrics.scala +++ b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedMetrics.scala @@ -101,6 +101,8 @@ class ReplicatedMetrics(measureInterval: FiniteDuration, cleanupInterval: Finite case MemberRemoved(m, _) ⇒ nodesInCluster -= nodeKey(m.address) + if (m.address == cluster.selfAddress) + context.stop(self) case Cleanup ⇒ def cleanupRemoved(data: LWWMap[Long]): LWWMap[Long] = diff --git a/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ServiceRegistry.scala b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ServiceRegistry.scala index dfe17559e5..4d6c91df1e 100644 --- a/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ServiceRegistry.scala +++ b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ServiceRegistry.scala @@ -35,7 +35,7 @@ object ServiceRegistry { */ final case class Bindings(name: String, services: Set[ActorRef]) /** - * Published to `System.eventStream` when services are changed. + * Published to `ActorSystem.eventStream` when services are changed. */ final case class BindingChanged(name: String, services: Set[ActorRef]) @@ -78,8 +78,8 @@ class ServiceRegistry extends Actor with ActorLogging { // add the service replicator ! Update(dKey, ORSet(), WriteLocal)(_ + service) - case Lookup(key) ⇒ - sender() ! Bindings(key, services.getOrElse(key, Set.empty)) + case Lookup(name) ⇒ + sender() ! Bindings(name, services.getOrElse(name, Set.empty)) case c @ Changed(AllServicesKey) ⇒ val newKeys = c.get(AllServicesKey).elements diff --git a/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala index 97cd57163c..89d94fbda5 100644 --- a/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala +++ b/akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala @@ -69,15 +69,16 @@ class ShoppingCart(userId: String) extends Actor { cart ⇒ updateCart(cart, item) } replicator ! update - - case GetFailure(DataKey, Some(AddItem(item))) ⇒ - // ReadMajority of Update failed, fall back to best effort local value - replicator ! Update(DataKey, LWWMap.empty[LineItem], writeMajority, None) { - cart ⇒ updateCart(cart, item) - } } //#add-item + def updateCart(data: LWWMap[LineItem], item: LineItem): LWWMap[LineItem] = + data.get(item.productId) match { + case Some(LineItem(_, _, existingQuantity)) ⇒ + data + (item.productId -> item.copy(quantity = existingQuantity + item.quantity)) + case None ⇒ data + (item.productId -> item) + } + //#remove-item def receiveRemoveItem: Receive = { case cmd @ RemoveItem(productId) ⇒ @@ -107,11 +108,4 @@ class ShoppingCart(userId: String) extends Actor { case e: UpdateFailure[_] ⇒ throw new IllegalStateException("Unexpected failure: " + e) } - def updateCart(data: LWWMap[LineItem], item: LineItem): LWWMap[LineItem] = - data.get(item.productId) match { - case Some(LineItem(_, _, existingQuantity)) ⇒ - data + (item.productId -> item.copy(quantity = existingQuantity + item.quantity)) - case None ⇒ data + (item.productId -> item) - } - } diff --git a/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala index 713c7ef63f..8def9c620c 100644 --- a/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala +++ b/akka-samples/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala @@ -61,14 +61,14 @@ class ReplicatedMetricsSpec extends MultiNodeSpec(ReplicatedMetricsSpec) with ST val probe = TestProbe() system.eventStream.subscribe(probe.ref, classOf[UsedHeap]) awaitAssert { - probe.expectMsgType[UsedHeap].percentPerNode.size should be(3) + probe.expectMsgType[UsedHeap](1.second).percentPerNode.size should be(3) } probe.expectMsgType[UsedHeap].percentPerNode.size should be(3) probe.expectMsgType[UsedHeap].percentPerNode.size should be(3) enterBarrier("after-2") } - "cleanup removed node" in within(15.seconds) { + "cleanup removed node" in within(25.seconds) { val node3Address = node(node3).address runOn(node1) { cluster.leave(node3Address) @@ -77,7 +77,7 @@ class ReplicatedMetricsSpec extends MultiNodeSpec(ReplicatedMetricsSpec) with ST val probe = TestProbe() system.eventStream.subscribe(probe.ref, classOf[UsedHeap]) awaitAssert { - probe.expectMsgType[UsedHeap].percentPerNode.size should be(2) + probe.expectMsgType[UsedHeap](1.second).percentPerNode.size should be(2) } probe.expectMsgType[UsedHeap].percentPerNode should not contain ( nodeKey(node3Address)) diff --git a/akka-samples/akka-sample-distributed-data-scala/tutorial/index.html b/akka-samples/akka-sample-distributed-data-scala/tutorial/index.html index 71d85524b9..b0e7690d24 100644 --- a/akka-samples/akka-sample-distributed-data-scala/tutorial/index.html +++ b/akka-samples/akka-sample-distributed-data-scala/tutorial/index.html @@ -45,6 +45,12 @@ It is eventually consistent and geared toward providing high read and write avai out-of-date value.

+

+Note that there are some +Limitations +that you should be aware of. For example, Akka Distributed Data is not intended for Big Data. +

+
@@ -127,6 +133,7 @@ The multi-node test for the VotingService can be found in Read the Using the Replicator documentation for more details of how to use Get, Update, and Subscribe. +

diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index d33e297e88..e49ce06c35 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -212,7 +212,7 @@ object AkkaBuild extends Build { sampleMainJava, sampleMainScala, sampleMainJavaLambda, sampleMultiNodeScala, samplePersistenceJava, samplePersistenceScala, samplePersistenceJavaLambda, sampleRemoteJava, sampleRemoteScala, sampleSupervisionJavaLambda, - sampleDistributedDataScala) + sampleDistributedDataScala, sampleDistributedDataJava) ) lazy val sampleCamelJava = Sample.project("akka-sample-camel-java") @@ -240,6 +240,7 @@ object AkkaBuild extends Build { lazy val sampleSupervisionJavaLambda = Sample.project("akka-sample-supervision-java-lambda") lazy val sampleDistributedDataScala = Sample.project("akka-sample-distributed-data-scala") + lazy val sampleDistributedDataJava = Sample.project("akka-sample-distributed-data-java") lazy val osgiDiningHakkersSampleMavenTest = Project(id = "akka-sample-osgi-dining-hakkers-maven-test", base = file("akka-samples/akka-sample-osgi-dining-hakkers-maven-test"),