=cdd #17778 Convert samples to java

This commit is contained in:
Patrik Nordwall 2015-07-01 09:46:58 +02:00
parent 7bfc56f3f0
commit 94a61c7eb2
32 changed files with 2220 additions and 67 deletions

View file

@ -44,8 +44,19 @@ final class LWWMap[A] private[akka] (
type T = LWWMap[A]
/**
* Scala API: All entries of the map.
*/
def entries: Map[String, A] = underlying.entries.map { case (k, r) k -> r.value }
/**
* Java API: All entries of the map.
*/
def getEntries(): java.util.Map[String, A] = {
import scala.collection.JavaConverters._
entries.asJava
}
def get(key: String): Option[A] = underlying.get(key).map(_.value)
def contains(key: String): Boolean = underlying.contains(key)

View file

@ -37,7 +37,10 @@ final class PNCounterMap private[akka] (
def entries: Map[String, BigInt] = underlying.entries.map { case (k, c) k -> c.value }
/** Java API */
def getEntries: Map[String, BigInteger] = underlying.entries.map { case (k, c) k -> c.value.bigInteger }
def getEntries: java.util.Map[String, BigInteger] = {
import scala.collection.JavaConverters._
underlying.entries.map { case (k, c) k -> c.value.bigInteger }.asJava
}
/**
* Scala API: The count for a key

View file

@ -217,6 +217,13 @@ object Replicator {
* Java API: `Get` value from local `Replicator`, i.e. `ReadLocal` consistency.
*/
def this(key: Key[A], consistency: ReadConsistency) = this(key, consistency, None)
/**
* Java API: `Get` value from local `Replicator`, i.e. `ReadLocal` consistency.
*/
def this(key: Key[A], consistency: ReadConsistency, request: Optional[Any]) =
this(key, consistency, Option(request.orElse(null)))
}
sealed abstract class GetResponse[A <: ReplicatedData] extends NoSerializationVerificationNeeded {
def key: Key[A]

View file

@ -5,7 +5,6 @@ package docs.ddata;
//#data-bot
import static java.util.concurrent.TimeUnit.SECONDS;
import scala.concurrent.duration.Duration;
import scala.concurrent.forkjoin.ThreadLocalRandom;
@ -42,41 +41,47 @@ public class DataBot extends AbstractActor {
private final Key<ORSet<String>> dataKey = ORSetKey.create("key");
@SuppressWarnings("unchecked")
public DataBot() {
receive(ReceiveBuilder.
match(String.class, a -> a.equals(TICK), a -> {
String s = String.valueOf((char) ThreadLocalRandom.current().nextInt(97, 123));
if (ThreadLocalRandom.current().nextBoolean()) {
// add
log.info("Adding: {}", s);
Update<ORSet<String>> update = new Update<>(
dataKey,
ORSet.create(),
Replicator.writeLocal(),
curr -> curr.add(node, s));
replicator.tell(update, self());
} else {
// remove
log.info("Removing: {}", s);
Update<ORSet<String>> update = new Update<>(
dataKey,
ORSet.create(),
Replicator.writeLocal(),
curr -> curr.remove(node, s));
replicator.tell(update, self());
}
}).
match(UpdateResponse.class, r -> {
// ignore
}).
match(Changed.class, c -> c.key().equals(dataKey), c -> {
@SuppressWarnings("unchecked")
Changed<ORSet<String>> c2 = c;
ORSet<String> data = c2.dataValue();
log.info("Current elements: {}", data.getElements());
}).
matchAny(o -> log.info("received unknown message")).build()
);
receive(ReceiveBuilder
.match(String.class, a -> a.equals(TICK), a -> receiveTick())
.match(Changed.class, c -> c.key().equals(dataKey), c -> receiveChanged((Changed<ORSet<String>>) c))
.match(UpdateResponse.class, r -> receiveUpdateResoponse())
.build());
}
private void receiveTick() {
String s = String.valueOf((char) ThreadLocalRandom.current().nextInt(97, 123));
if (ThreadLocalRandom.current().nextBoolean()) {
// add
log.info("Adding: {}", s);
Update<ORSet<String>> update = new Update<>(
dataKey,
ORSet.create(),
Replicator.writeLocal(),
curr -> curr.add(node, s));
replicator.tell(update, self());
} else {
// remove
log.info("Removing: {}", s);
Update<ORSet<String>> update = new Update<>(
dataKey,
ORSet.create(),
Replicator.writeLocal(),
curr -> curr.remove(node, s));
replicator.tell(update, self());
}
}
private void receiveChanged(Changed<ORSet<String>> c) {
ORSet<String> data = c.dataValue();
log.info("Current elements: {}", data.getElements());
}
private void receiveUpdateResoponse() {
// ignore
}

View file

@ -176,13 +176,11 @@ to 4 nodes and reads from 4 nodes.
Here is an example of using ``writeMajority`` and ``readMajority``:
**FIXME convert this example to Java**
.. includecode:: ../../../akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java#read-write-majority
.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#read-write-majority
.. includecode:: ../../../akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java#get-cart
.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#get-cart
.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#add-item
.. includecode:: ../../../akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java#add-item
In some rare cases, when performing an ``Update`` it is needed to first try to fetch latest data from
other nodes. That can be done by first sending a ``Get`` with ``ReadMajority`` and then continue with
@ -194,9 +192,7 @@ performed (hence the name observed-removed set).
The following example illustrates how to do that:
**FIXME convert this example to Java**
.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#remove-item
.. includecode:: ../../../akka-samples/akka-sample-distributed-data-java/src/main/java/sample/distributeddata/ShoppingCart.java#remove-item
.. warning::
@ -446,10 +442,8 @@ cluster. Data types that need pruning have to implement the ``RemovedNodePruning
Samples
=======
**FIXME convert these sampes to Java**
Several interesting samples are included and described in the `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
tutorial named `Akka Distributed Data Samples with Scala <http://www.typesafe.com/activator/template/akka-sample-distributed-data-scala>`_.
tutorial named `Akka Distributed Data Samples with Java <http://www.typesafe.com/activator/template/akka-sample-distributed-data-java>`_.
* Low Latency Voting Service
* Highly Available Shopping Cart

View file

@ -0,0 +1,17 @@
*#
*.iml
*.ipr
*.iws
*.pyc
*.tm.epoch
*.vim
*-shim.sbt
.idea/
/project/plugins/project
project/boot
target/
/logs
.cache
.classpath
.project
.settings

View file

@ -0,0 +1,121 @@
Creative Commons Legal Code
CC0 1.0 Universal
CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE
LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN
ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS
INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES
REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS
PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM
THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED
HEREUNDER.
Statement of Purpose
The laws of most jurisdictions throughout the world automatically confer
exclusive Copyright and Related Rights (defined below) upon the creator
and subsequent owner(s) (each and all, an "owner") of an original work of
authorship and/or a database (each, a "Work").
Certain owners wish to permanently relinquish those rights to a Work for
the purpose of contributing to a commons of creative, cultural and
scientific works ("Commons") that the public can reliably and without fear
of later claims of infringement build upon, modify, incorporate in other
works, reuse and redistribute as freely as possible in any form whatsoever
and for any purposes, including without limitation commercial purposes.
These owners may contribute to the Commons to promote the ideal of a free
culture and the further production of creative, cultural and scientific
works, or to gain reputation or greater distribution for their Work in
part through the use and efforts of others.
For these and/or other purposes and motivations, and without any
expectation of additional consideration or compensation, the person
associating CC0 with a Work (the "Affirmer"), to the extent that he or she
is an owner of Copyright and Related Rights in the Work, voluntarily
elects to apply CC0 to the Work and publicly distribute the Work under its
terms, with knowledge of his or her Copyright and Related Rights in the
Work and the meaning and intended legal effect of CC0 on those rights.
1. Copyright and Related Rights. A Work made available under CC0 may be
protected by copyright and related or neighboring rights ("Copyright and
Related Rights"). Copyright and Related Rights include, but are not
limited to, the following:
i. the right to reproduce, adapt, distribute, perform, display,
communicate, and translate a Work;
ii. moral rights retained by the original author(s) and/or performer(s);
iii. publicity and privacy rights pertaining to a person's image or
likeness depicted in a Work;
iv. rights protecting against unfair competition in regards to a Work,
subject to the limitations in paragraph 4(a), below;
v. rights protecting the extraction, dissemination, use and reuse of data
in a Work;
vi. database rights (such as those arising under Directive 96/9/EC of the
European Parliament and of the Council of 11 March 1996 on the legal
protection of databases, and under any national implementation
thereof, including any amended or successor version of such
directive); and
vii. other similar, equivalent or corresponding rights throughout the
world based on applicable law or treaty, and any national
implementations thereof.
2. Waiver. To the greatest extent permitted by, but not in contravention
of, applicable law, Affirmer hereby overtly, fully, permanently,
irrevocably and unconditionally waives, abandons, and surrenders all of
Affirmer's Copyright and Related Rights and associated claims and causes
of action, whether now known or unknown (including existing as well as
future claims and causes of action), in the Work (i) in all territories
worldwide, (ii) for the maximum duration provided by applicable law or
treaty (including future time extensions), (iii) in any current or future
medium and for any number of copies, and (iv) for any purpose whatsoever,
including without limitation commercial, advertising or promotional
purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each
member of the public at large and to the detriment of Affirmer's heirs and
successors, fully intending that such Waiver shall not be subject to
revocation, rescission, cancellation, termination, or any other legal or
equitable action to disrupt the quiet enjoyment of the Work by the public
as contemplated by Affirmer's express Statement of Purpose.
3. Public License Fallback. Should any part of the Waiver for any reason
be judged legally invalid or ineffective under applicable law, then the
Waiver shall be preserved to the maximum extent permitted taking into
account Affirmer's express Statement of Purpose. In addition, to the
extent the Waiver is so judged Affirmer hereby grants to each affected
person a royalty-free, non transferable, non sublicensable, non exclusive,
irrevocable and unconditional license to exercise Affirmer's Copyright and
Related Rights in the Work (i) in all territories worldwide, (ii) for the
maximum duration provided by applicable law or treaty (including future
time extensions), (iii) in any current or future medium and for any number
of copies, and (iv) for any purpose whatsoever, including without
limitation commercial, advertising or promotional purposes (the
"License"). The License shall be deemed effective as of the date CC0 was
applied by Affirmer to the Work. Should any part of the License for any
reason be judged legally invalid or ineffective under applicable law, such
partial invalidity or ineffectiveness shall not invalidate the remainder
of the License, and in such case Affirmer hereby affirms that he or she
will not (i) exercise any of his or her remaining Copyright and Related
Rights in the Work or (ii) assert any associated claims and causes of
action with respect to the Work, in either case contrary to Affirmer's
express Statement of Purpose.
4. Limitations and Disclaimers.
a. No trademark or patent rights held by Affirmer are waived, abandoned,
surrendered, licensed or otherwise affected by this document.
b. Affirmer offers the Work as-is and makes no representations or
warranties of any kind concerning the Work, express, implied,
statutory or otherwise, including without limitation warranties of
title, merchantability, fitness for a particular purpose, non
infringement, or the absence of latent or other defects, accuracy, or
the present or absence of errors, whether or not discoverable, all to
the greatest extent permissible under applicable law.
c. Affirmer disclaims responsibility for clearing rights of other persons
that may apply to the Work or any use thereof, including without
limitation any person's Copyright and Related Rights in the Work.
Further, Affirmer disclaims responsibility for obtaining any necessary
consents, permissions or other rights required for any use of the
Work.
d. Affirmer understands and acknowledges that Creative Commons is not a
party to this document and has no duty or obligation with respect to
this CC0 or use of the Work.

View file

@ -0,0 +1,10 @@
Activator Template by Typesafe
Licensed under Public Domain (CC0)
To the extent possible under law, the person who associated CC0 with
this Activator Tempate has waived all copyright and related or neighboring
rights to this Activator Template.
You should have received a copy of the CC0 legalcode along with this
work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.

View file

@ -0,0 +1,7 @@
name=akka-sample-distributed-data-java
title=Akka Distributed Data Samples with Java
description=Akka Distributed Data Samples with Java
tags=akka,cluster,java,sample,distributed-data
authorName=Akka Team
authorLink=http://akka.io/
sourceLink=https://github.com/akka/akka

View file

@ -0,0 +1,47 @@
import com.typesafe.sbt.SbtMultiJvm
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
val akkaVersion = "2.4-SNAPSHOT"
val project = Project(
id = "akka-sample-distributed-data-java",
base = file("."),
settings = Project.defaultSettings ++ SbtMultiJvm.multiJvmSettings ++ Seq(
name := "akka-sample-distributed-data-java",
version := "2.4-SNAPSHOT",
scalaVersion := "2.11.6",
scalacOptions in Compile ++= Seq("-encoding", "UTF-8", "-target:jvm-1.8", "-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint"),
javacOptions in Compile ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint:unchecked", "-Xlint:deprecation", "-Xdiags:verbose"),
javacOptions in doc in Compile := Seq("-source", "1.8"),
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-remote" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion,
"com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion,
"org.scalatest" %% "scalatest" % "2.2.1" % "test"),
javaOptions in run ++= Seq(
"-Xms128m", "-Xmx1024m"),
Keys.fork in run := true,
// make sure that MultiJvm test are compiled by the default test compilation
compile in MultiJvm <<= (compile in MultiJvm) triggeredBy (compile in Test),
// disable parallel tests
parallelExecution in Test := false,
// make sure that MultiJvm tests are executed by the default test target,
// and combine the results from ordinary test and multi-jvm tests
executeTests in Test <<= (executeTests in Test, executeTests in MultiJvm) map {
case (testResults, multiNodeResults) =>
val overall =
if (testResults.overall.id < multiNodeResults.overall.id)
multiNodeResults.overall
else
testResults.overall
Tests.Output(overall,
testResults.events ++ multiNodeResults.events,
testResults.summaries ++ multiNodeResults.summaries)
}
)
) configs (MultiJvm)
fork in run := true

View file

@ -0,0 +1 @@
sbt.version=0.13.7

View file

@ -0,0 +1,4 @@
resolvers += Classpaths.typesafeResolver
addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.3.8")

View file

@ -0,0 +1,3 @@
// This plugin represents functionality that is to be added to sbt in the future
addSbtPlugin("org.scala-sbt" % "sbt-core-next" % "0.1.1")

View file

@ -0,0 +1,163 @@
package sample.distributeddata;
import static akka.cluster.ddata.Replicator.readLocal;
import static akka.cluster.ddata.Replicator.writeLocal;
import java.util.Optional;
import scala.Option;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ddata.DistributedData;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.LWWMap;
import akka.cluster.ddata.LWWMapKey;
import akka.cluster.ddata.Replicator.Get;
import akka.cluster.ddata.Replicator.GetSuccess;
import akka.cluster.ddata.Replicator.NotFound;
import akka.cluster.ddata.Replicator.Update;
import akka.cluster.ddata.Replicator.UpdateResponse;
import akka.japi.pf.ReceiveBuilder;
@SuppressWarnings("unchecked")
public class ReplicatedCache extends AbstractActor {
static class Request {
public final String key;
public final ActorRef replyTo;
public Request(String key, ActorRef replyTo) {
this.key = key;
this.replyTo = replyTo;
}
}
public static class PutInCache {
public final String key;
public final Object value;
public PutInCache(String key, Object value) {
this.key = key;
this.value = value;
}
}
public static class GetFromCache {
public final String key;
public GetFromCache(String key) {
this.key = key;
}
}
public static class Cached {
public final String key;
public final Optional<Object> value;
public Cached(String key, Optional<Object> value) {
this.key = key;
this.value = value;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Cached other = (Cached) obj;
if (key == null) {
if (other.key != null)
return false;
} else if (!key.equals(other.key))
return false;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
}
@Override
public String toString() {
return "Cached [key=" + key + ", value=" + value + "]";
}
}
public static class Evict {
public final String key;
public Evict(String key) {
this.key = key;
}
}
public static Props props() {
return Props.create(ReplicatedCache.class);
}
private final ActorRef replicator = DistributedData.get(context().system()).replicator();
private final Cluster node = Cluster.get(context().system());
public ReplicatedCache() {
receive(ReceiveBuilder
.match(PutInCache.class, cmd -> receivePutInCache(cmd.key, cmd.value))
.match(Evict.class, cmd -> receiveEvict(cmd.key))
.match(GetFromCache.class, cmd -> receiveGetFromCache(cmd.key))
.match(GetSuccess.class, g -> receiveGetSuccess((GetSuccess<LWWMap<Object>>) g))
.match(NotFound.class, n -> receiveNotFound((NotFound<LWWMap<Object>>) n))
.match(UpdateResponse.class, u -> {})
.build());
}
private void receivePutInCache(String key, Object value) {
Update<LWWMap<Object>> update = new Update<>(dataKey(key), LWWMap.create(), writeLocal(),
curr -> curr.put(node, key, value));
replicator.tell(update, self());
}
private void receiveEvict(String key) {
Update<LWWMap<Object>> update = new Update<>(dataKey(key), LWWMap.create(), writeLocal(),
curr -> curr.remove(node, key));
replicator.tell(update, self());
}
private void receiveGetFromCache(String key) {
Optional<Object> ctx = Optional.of(new Request(key, sender()));
Get<LWWMap<Object>> get = new Get<>(dataKey(key), readLocal(), ctx);
replicator.tell(get, self());
}
private void receiveGetSuccess(GetSuccess<LWWMap<Object>> g) {
Request req = (Request) g.getRequest().get();
Option<Object> valueOption = g.dataValue().get(req.key);
Optional<Object> valueOptional = Optional.ofNullable(valueOption.isDefined() ? valueOption.get() : null);
req.replyTo.tell(new Cached(req.key, valueOptional), self());
}
private void receiveNotFound(NotFound<LWWMap<Object>> n) {
Request req = (Request) n.getRequest().get();
req.replyTo.tell(new Cached(req.key, Optional.empty()), self());
}
private Key<LWWMap<Object>> dataKey(String entryKey) {
return LWWMapKey.create("cache-" + Math.abs(entryKey.hashCode()) % 100);
}
}

View file

@ -0,0 +1,167 @@
package sample.distributeddata;
import static akka.cluster.ddata.Replicator.writeLocal;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberRemoved;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.ddata.DistributedData;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.LWWMap;
import akka.cluster.ddata.LWWMapKey;
import akka.cluster.ddata.Replicator.Changed;
import akka.cluster.ddata.Replicator.Subscribe;
import akka.cluster.ddata.Replicator.Update;
import akka.cluster.ddata.Replicator.UpdateResponse;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
@SuppressWarnings("unchecked")
public class ReplicatedMetrics extends AbstractActor {
public static Props props(FiniteDuration measureInterval, FiniteDuration cleanupInterval) {
return Props.create(ReplicatedMetrics.class, measureInterval, cleanupInterval);
}
public static class UsedHeap {
public Map<String, Double> percentPerNode;
public UsedHeap(Map<String, Double> percentPerNode) {
this.percentPerNode = percentPerNode;
}
}
private static final String TICK = "tick";
private static final String CLEANUP = "cleanup";
public static String nodeKey(Address address) {
return address.host().get() + ":" + address.port().get();
}
private final ActorRef replicator = DistributedData.get(context().system()).replicator();
private final Cluster node = Cluster.get(context().system());
private final String selfNodeKey = nodeKey(node.selfAddress());
private final MemoryMXBean memoryMBean = ManagementFactory.getMemoryMXBean();
private final LoggingAdapter log = Logging.getLogger(context().system(), this);
private final Key<LWWMap<Long>> usedHeapKey = LWWMapKey.create("usedHeap");
private final Key<LWWMap<Long>> maxHeapKey = LWWMapKey.create("maxHeap");
private final Cancellable tickTask;
private final Cancellable cleanupTask;
private Map<String, Long> maxHeap = new HashMap<>();
private final Set<String> nodesInCluster = new HashSet<>();
@Override
public void preStart() {
replicator.tell(new Subscribe<>(maxHeapKey, self()), ActorRef.noSender());
replicator.tell(new Subscribe<>(usedHeapKey, self()), ActorRef.noSender());
node.subscribe(self(), ClusterEvent.initialStateAsEvents(),
MemberUp.class, MemberRemoved.class);
}
@Override
public void postStop() throws Exception {
tickTask.cancel();
cleanupTask.cancel();
node.unsubscribe(self());
super.postStop();
}
public ReplicatedMetrics(FiniteDuration measureInterval, FiniteDuration cleanupInterval) {
tickTask = context().system().scheduler().schedule(measureInterval, measureInterval,
self(), TICK, context().dispatcher(), self());
cleanupTask = context().system().scheduler().schedule(cleanupInterval, cleanupInterval,
self(), CLEANUP, context().dispatcher(), self());
receive(ReceiveBuilder
.matchEquals(TICK, t -> receiveTick())
.match(Changed.class, c -> c.key().equals(maxHeapKey), c -> receiveMaxHeapChanged((Changed<LWWMap<Long>>) c))
.match(Changed.class, c -> c.key().equals(usedHeapKey), c -> receiveUsedHeapChanged((Changed<LWWMap<Long>>) c))
.match(UpdateResponse.class, u -> {})
.match(MemberUp.class, m -> receiveMemberUp(m.member().address()))
.match(MemberRemoved.class, m -> receiveMemberRemoved(m.member().address()))
.matchEquals(CLEANUP, c -> receiveCleanup())
.build());
}
private void receiveTick() {
MemoryUsage heap = memoryMBean.getHeapMemoryUsage();
long used = heap.getUsed();
long max = heap.getMax();
Update<LWWMap<Long>> update1 = new Update<>(usedHeapKey, LWWMap.create(), writeLocal(),
curr -> curr.put(node, selfNodeKey, used));
replicator.tell(update1, self());
Update<LWWMap<Long>> update2 = new Update<>(maxHeapKey, LWWMap.create(), writeLocal(), curr -> {
if (curr.contains(selfNodeKey) && curr.get(selfNodeKey).get().longValue() == max)
return curr; // unchanged
else
return curr.put(node, selfNodeKey, max);
});
replicator.tell(update2, self());
}
private void receiveMaxHeapChanged(Changed<LWWMap<Long>> c) {
maxHeap = c.dataValue().getEntries();
}
private void receiveUsedHeapChanged(Changed<LWWMap<Long>> c) {
Map<String, Double> percentPerNode = new HashMap<>();
for (Map.Entry<String, Long> entry : c.dataValue().getEntries().entrySet()) {
if (maxHeap.containsKey(entry.getKey())) {
double percent = (entry.getValue().doubleValue() / maxHeap.get(entry.getKey())) * 100.0;
percentPerNode.put(entry.getKey(), percent);
}
}
UsedHeap usedHeap = new UsedHeap(percentPerNode);
log.debug("Node {} observed:\n{}", node, usedHeap);
context().system().eventStream().publish(usedHeap);
}
private void receiveMemberUp(Address address) {
nodesInCluster.add(nodeKey(address));
}
private void receiveMemberRemoved(Address address) {
nodesInCluster.remove(nodeKey(address));
if (address.equals(node.selfAddress()))
context().stop(self());
}
private void receiveCleanup() {
Update<LWWMap<Long>> update1 = new Update<>(usedHeapKey, LWWMap.create(), writeLocal(), curr -> cleanup(curr));
replicator.tell(update1, self());
Update<LWWMap<Long>> update2 = new Update<>(maxHeapKey, LWWMap.create(), writeLocal(), curr -> cleanup(curr));
replicator.tell(update2, self());
}
private LWWMap<Long> cleanup(LWWMap<Long> data) {
LWWMap<Long> result = data;
log.info("Cleanup " + nodesInCluster + " -- " + data.getEntries().keySet());
for (String k : data.getEntries().keySet()) {
if (!nodesInCluster.contains(k)) {
result = result.remove(node, k);
}
}
return result;
}
}

View file

@ -0,0 +1,248 @@
package sample.distributeddata;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ddata.DistributedData;
import akka.cluster.ddata.GSet;
import akka.cluster.ddata.GSetKey;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ORSet;
import akka.cluster.ddata.Replicator;
import akka.cluster.ddata.Replicator.Changed;
import akka.cluster.ddata.Replicator.Subscribe;
import akka.cluster.ddata.Replicator.Update;
import akka.cluster.ddata.Replicator.UpdateResponse;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
@SuppressWarnings("unchecked")
public class ServiceRegistry extends AbstractActor {
/**
* Register a `service` with a `name`. Several services can be registered with
* the same `name`. It will be removed when it is terminated.
*/
public static class Register {
public final String name;
public final ActorRef service;
public Register(String name, ActorRef service) {
this.name = name;
this.service = service;
}
}
/**
* Lookup services registered for a `name`. {@link Bindings} will be sent to
* `sender()`.
*/
public static class Lookup {
public final String name;
public Lookup(String name) {
this.name = name;
}
}
/**
* Reply for {@link Lookup}
*/
public static class Bindings {
public final String name;
public final Set<ActorRef> services;
public Bindings(String name, Set<ActorRef> services) {
this.name = name;
this.services = services;
}
}
/**
* Published to `ActorSystem.eventStream` when services are changed.
*/
public static class BindingChanged {
public final String name;
public final Set<ActorRef> services;
public BindingChanged(String name, Set<ActorRef> services) {
this.name = name;
this.services = services;
}
}
public static class ServiceKey extends Key<ORSet<ActorRef>> {
private static final long serialVersionUID = 1L;
public ServiceKey(String serviceName) {
super(serviceName);
}
}
public static Props props() {
return Props.create(ServiceRegistry.class);
}
private final LoggingAdapter log = Logging.getLogger(context().system(), this);
private final ActorRef replicator = DistributedData.get(context().system()).replicator();
private final Cluster node = Cluster.get(context().system());
private final Key<GSet<ServiceKey>> allServicesKey = GSetKey.create("service-keys");
private Set<ServiceKey> keys = new HashSet<>();
private final Map<String, Set<ActorRef>> services = new HashMap<>();
private boolean leader = false;
public ServiceRegistry() {
receive(matchCommands()
.orElse(matchChanged())
.orElse(matchWatch())
.orElse(matchOther()));
}
@Override
public void preStart() {
replicator.tell(new Subscribe<>(allServicesKey, self()), ActorRef.noSender());
node.subscribe(self(), ClusterEvent.initialStateAsEvents(), ClusterEvent.LeaderChanged.class);
}
@Override
public void postStop() throws Exception {
node.unsubscribe(self());
super.postStop();
}
private PartialFunction<Object, BoxedUnit> matchCommands() {
return ReceiveBuilder
.match(Register.class, r -> receiveRegister(r))
.match(Lookup.class, l -> receiveLookup(l))
.build();
}
private ServiceKey serviceKey(String serviceName) {
return new ServiceKey("service:" + serviceName);
}
private void receiveRegister(Register r) {
ServiceKey dKey = serviceKey(r.name);
// store the service names in a separate GSet to be able to
// get notifications of new names
if (!keys.contains(dKey)) {
Update<GSet<ServiceKey>> update1 = new Update<>(allServicesKey, GSet.create(), Replicator.writeLocal(),
curr -> curr.add(dKey));
replicator.tell(update1, self());
}
Update<ORSet<ActorRef>> update2 = new Update<>(dKey, ORSet.create(), Replicator.writeLocal(),
curr -> curr.add(node, r.service));
replicator.tell(update2, self());
}
private void receiveLookup(Lookup l) {
sender().tell(new Bindings(l.name, services.getOrDefault(l.name, Collections.emptySet())), self());
}
private PartialFunction<Object, BoxedUnit> matchChanged() {
return ReceiveBuilder
.match(Changed.class, c -> {
if (c.key().equals(allServicesKey))
receiveAllServicesKeysChanged((Changed<GSet<ServiceKey>>) c);
else if (c.key() instanceof ServiceKey)
receiveServiceChanged((Changed<ORSet<ActorRef>>) c);
})
.build();
}
private void receiveAllServicesKeysChanged(Changed<GSet<ServiceKey>> c) {
Set<ServiceKey> newKeys = c.dataValue().getElements();
Set<ServiceKey> diff = new HashSet<>(newKeys);
diff.removeAll(keys);
log.debug("Services changed, added: {}, all: {}", diff, newKeys);
diff.forEach(dKey -> {
// subscribe to get notifications of when services with this name are added or removed
replicator.tell(new Subscribe<ORSet<ActorRef>>(dKey, self()), self());
});
keys = newKeys;
}
private void receiveServiceChanged(Changed<ORSet<ActorRef>> c) {
String name = c.key().id().split(":")[1];
Set<ActorRef> newServices = c.get(serviceKey(name)).getElements();
log.debug("Services changed for name [{}]: {}", name, newServices);
services.put(name, newServices);
context().system().eventStream().publish(new BindingChanged(name, newServices));
if (leader) {
newServices.forEach(ref -> context().watch(ref)); // watch is idempotent
}
}
private PartialFunction<Object, BoxedUnit> matchWatch() {
return ReceiveBuilder
.match(ClusterEvent.LeaderChanged.class, c -> c.getLeader() != null,
c -> receiveLeaderChanged(c.getLeader()))
.match(Terminated.class, t -> receiveTerminated(t.actor()))
.build();
}
private void receiveLeaderChanged(Address newLeader) {
// Let one node (the leader) be responsible for removal of terminated services
// to avoid redundant work and too many death watch notifications.
// It is not critical to only do it from one node.
boolean wasLeader = leader;
leader = newLeader.equals(node.selfAddress());
// when used with many (> 500) services you must increase the system message buffer
// `akka.remote.system-message-buffer-size`
if (!wasLeader && leader) {
for (Set<ActorRef> refs : services.values()) {
for (ActorRef ref : refs) {
context().watch(ref);
}
}
} else if (wasLeader && !leader) {
for (Set<ActorRef> refs : services.values()) {
for (ActorRef ref : refs) {
context().unwatch(ref);
}
}
}
}
private void receiveTerminated(ActorRef ref) {
for (Map.Entry<String, Set<ActorRef>> entry : services.entrySet()) {
if (entry.getValue().contains(ref)) {
log.debug("Service with name [{}] terminated: {}", entry.getKey(), ref);
ServiceKey dKey = serviceKey(entry.getKey());
Update<ORSet<ActorRef>> update = new Update<>(dKey, ORSet.create(), Replicator.writeLocal(),
curr -> curr.remove(node, ref));
replicator.tell(update, self());
}
}
}
private PartialFunction<Object, BoxedUnit> matchOther() {
return ReceiveBuilder
.match(UpdateResponse.class, u -> {
// ok
})
.build();
}
}

View file

@ -0,0 +1,277 @@
package sample.distributeddata;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import scala.PartialFunction;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxedUnit;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ddata.DistributedData;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.LWWMap;
import akka.cluster.ddata.LWWMapKey;
import akka.cluster.ddata.Replicator;
import akka.cluster.ddata.Replicator.GetFailure;
import akka.cluster.ddata.Replicator.GetResponse;
import akka.cluster.ddata.Replicator.GetSuccess;
import akka.cluster.ddata.Replicator.NotFound;
import akka.cluster.ddata.Replicator.ReadConsistency;
import akka.cluster.ddata.Replicator.ReadMajority;
import akka.cluster.ddata.Replicator.Update;
import akka.cluster.ddata.Replicator.UpdateFailure;
import akka.cluster.ddata.Replicator.UpdateSuccess;
import akka.cluster.ddata.Replicator.UpdateTimeout;
import akka.cluster.ddata.Replicator.WriteConsistency;
import akka.cluster.ddata.Replicator.WriteMajority;
import akka.japi.pf.ReceiveBuilder;
@SuppressWarnings("unchecked")
public class ShoppingCart extends AbstractActor {
//#read-write-majority
private final WriteConsistency writeMajority =
new WriteMajority(Duration.create(3, SECONDS));
private final static ReadConsistency readMajority =
new ReadMajority(Duration.create(3, SECONDS));
//#read-write-majority
public static final String GET_CART = "getCart";
public static class AddItem {
public final LineItem item;
public AddItem(LineItem item) {
this.item = item;
}
}
public static class RemoveItem {
public final String productId;
public RemoveItem(String productId) {
this.productId = productId;
}
}
public static class Cart {
public final Set<LineItem> items;
public Cart(Set<LineItem> items) {
this.items = items;
}
}
public static class LineItem implements Serializable {
private static final long serialVersionUID = 1L;
public final String productId;
public final String title;
public final int quantity;
public LineItem(String productId, String title, int quantity) {
this.productId = productId;
this.title = title;
this.quantity = quantity;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((productId == null) ? 0 : productId.hashCode());
result = prime * result + quantity;
result = prime * result + ((title == null) ? 0 : title.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
LineItem other = (LineItem) obj;
if (productId == null) {
if (other.productId != null)
return false;
} else if (!productId.equals(other.productId))
return false;
if (quantity != other.quantity)
return false;
if (title == null) {
if (other.title != null)
return false;
} else if (!title.equals(other.title))
return false;
return true;
}
@Override
public String toString() {
return "LineItem [productId=" + productId + ", title=" + title + ", quantity=" + quantity + "]";
}
}
public static Props props(String userId) {
return Props.create(ShoppingCart.class, userId);
}
private final ActorRef replicator = DistributedData.get(context().system()).replicator();
private final Cluster node = Cluster.get(context().system());
@SuppressWarnings("unused")
private final String userId;
private final Key<LWWMap<LineItem>> dataKey;
public ShoppingCart(String userId) {
this.userId = userId;
this.dataKey = LWWMapKey.create("cart-" + userId);
receive(matchGetCart()
.orElse(matchAddItem())
.orElse(matchRemoveItem())
.orElse(matchOther()));
}
//#get-cart
private PartialFunction<Object, BoxedUnit> matchGetCart() {
return ReceiveBuilder
.matchEquals((GET_CART),
s -> receiveGetCart())
.match(GetSuccess.class, g -> isResponseToGetCart(g),
g -> receiveGetSuccess((GetSuccess<LWWMap<LineItem>>) g))
.match(NotFound.class, n -> isResponseToGetCart(n),
n -> receiveNotFound((NotFound<LWWMap<LineItem>>) n))
.match(GetFailure.class, f -> isResponseToGetCart(f),
f -> receiveGetFailure((GetFailure<LWWMap<LineItem>>) f))
.build();
}
private void receiveGetCart() {
Optional<Object> ctx = Optional.of(sender());
replicator.tell(new Replicator.Get<LWWMap<LineItem>>(dataKey, readMajority, ctx),
self());
}
private boolean isResponseToGetCart(GetResponse<?> response) {
return response.key().equals(dataKey) &&
(response.getRequest().orElse(null) instanceof ActorRef);
}
private void receiveGetSuccess(GetSuccess<LWWMap<LineItem>> g) {
Set<LineItem> items = new HashSet<>(g.dataValue().getEntries().values());
ActorRef replyTo = (ActorRef) g.getRequest().get();
replyTo.tell(new Cart(items), self());
}
private void receiveNotFound(NotFound<LWWMap<LineItem>> n) {
ActorRef replyTo = (ActorRef) n.getRequest().get();
replyTo.tell(new Cart(new HashSet<>()), self());
}
private void receiveGetFailure(GetFailure<LWWMap<LineItem>> f) {
// ReadMajority failure, try again with local read
Optional<Object> ctx = Optional.of(sender());
replicator.tell(new Replicator.Get<LWWMap<LineItem>>(dataKey, Replicator.readLocal(),
ctx), self());
}
//#get-cart
//#add-item
private PartialFunction<Object, BoxedUnit> matchAddItem() {
return ReceiveBuilder
.match(AddItem.class, r -> receiveAddItem(r))
.build();
}
private void receiveAddItem(AddItem add) {
Update<LWWMap<LineItem>> update = new Update<>(dataKey, LWWMap.create(), writeMajority,
cart -> updateCart(cart, add.item));
replicator.tell(update, self());
}
//#add-item
private LWWMap<LineItem> updateCart(LWWMap<LineItem> data, LineItem item) {
if (data.contains(item.productId)) {
LineItem existingItem = data.get(item.productId).get();
int newQuantity = existingItem.quantity + item.quantity;
LineItem newItem = new LineItem(item.productId, item.title, newQuantity);
return data.put(node, item.productId, newItem);
} else {
return data.put(node, item.productId, item);
}
}
private PartialFunction<Object, BoxedUnit> matchRemoveItem() {
return ReceiveBuilder
.match(RemoveItem.class, r -> receiveRemoveItem(r))
.match(GetSuccess.class, g -> isResponseToRemoveItem(g),
g -> receiveRemoveItemGetSuccess((GetSuccess<LWWMap<LineItem>>) g))
.match(GetFailure.class, f -> isResponseToRemoveItem(f),
f -> receiveRemoveItemGetFailure((GetFailure<LWWMap<LineItem>>) f))
.match(NotFound.class, n -> isResponseToRemoveItem(n), n -> {/* nothing to remove */})
.build();
}
//#remove-item
private void receiveRemoveItem(RemoveItem rm) {
// Try to fetch latest from a majority of nodes first, since ORMap
// remove must have seen the item to be able to remove it.
Optional<Object> ctx = Optional.of(rm);
replicator.tell(new Replicator.Get<LWWMap<LineItem>>(dataKey, readMajority, ctx),
self());
}
private void receiveRemoveItemGetSuccess(GetSuccess<LWWMap<LineItem>> g) {
RemoveItem rm = (RemoveItem) g.getRequest().get();
removeItem(rm.productId);
}
private void receiveRemoveItemGetFailure(GetFailure<LWWMap<LineItem>> f) {
// ReadMajority failed, fall back to best effort local value
RemoveItem rm = (RemoveItem) f.getRequest().get();
removeItem(rm.productId);
}
private void removeItem(String productId) {
Update<LWWMap<LineItem>> update = new Update<>(dataKey, LWWMap.create(), writeMajority,
cart -> cart.remove(node, productId));
replicator.tell(update, self());
}
private boolean isResponseToRemoveItem(GetResponse<?> response) {
return response.key().equals(dataKey) &&
(response.getRequest().orElse(null) instanceof RemoveItem);
}
//#remove-item
private PartialFunction<Object, BoxedUnit> matchOther() {
return ReceiveBuilder
.match(UpdateSuccess.class, u -> {
// ok
})
.match(UpdateTimeout.class, t -> {
// will eventually be replicated
})
.match(UpdateFailure.class, f -> {
throw new IllegalStateException("Unexpected failure: " + f);
})
.build();
}
}

View file

@ -0,0 +1,149 @@
package sample.distributeddata;
import java.util.Optional;
import java.util.HashMap;
import java.math.BigInteger;
import java.util.Map;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import scala.concurrent.duration.Duration;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.cluster.Cluster;
import akka.cluster.ddata.*;
import akka.japi.pf.ReceiveBuilder;
import static akka.cluster.ddata.Replicator.*;
import static java.util.concurrent.TimeUnit.SECONDS;
@SuppressWarnings("unchecked")
public class VotingService extends AbstractActor {
public static final String OPEN = "open";
public static final String CLOSE = "close";
public static final String GET_VOTES = "getVotes";
public static class Votes {
public final Map<String, BigInteger> result;
public final boolean open;
public Votes(Map<String, BigInteger> result, boolean open) {
this.result = result;
this.open = open;
}
}
public static class Vote {
public final String participant;
public Vote(String participant) {
this.participant = participant;
}
}
private final ActorRef replicator = DistributedData.get(context().system()).replicator();
private final Cluster node = Cluster.get(context().system());
private final Key<Flag> openedKey = FlagKey.create("contestOpened");
private final Key<Flag> closedKey = FlagKey.create("contestClosed");
private final Key<PNCounterMap> countersKey = PNCounterMapKey.create("contestCounters");
private final WriteConsistency writeAll = new WriteAll(Duration.create(5, SECONDS));
private final ReadConsistency readAll = new ReadAll(Duration.create(3, SECONDS));
@Override
public void preStart() {
replicator.tell(new Subscribe<>(openedKey, self()), ActorRef.noSender());
}
public VotingService() {
receive(ReceiveBuilder
.matchEquals(OPEN, cmd -> receiveOpen())
.match(Changed.class, c -> c.key().equals(openedKey), c -> receiveOpenedChanged((Changed<Flag>) c))
.matchEquals(GET_VOTES, cmd -> receiveGetVotesEmpty())
.build());
}
private void receiveOpen() {
Update<Flag> update = new Update<>(openedKey, Flag.create(), writeAll, curr -> curr.switchOn());
replicator.tell(update, self());
becomeOpen();
}
private void becomeOpen() {
replicator.tell(new Unsubscribe<>(openedKey, self()), ActorRef.noSender());
replicator.tell(new Subscribe<>(closedKey, self()), ActorRef.noSender());
context().become(matchOpen().orElse(matchGetVotes(true)));
}
private void receiveOpenedChanged(Changed<Flag> c) {
if (c.dataValue().enabled())
becomeOpen();
}
private void receiveGetVotesEmpty() {
sender().tell(new Votes(new HashMap<>(), false), self());
}
private PartialFunction<Object, BoxedUnit> matchOpen() {
return ReceiveBuilder
.match(Vote.class, vote -> receiveVote(vote))
.match(UpdateSuccess.class, u -> receiveUpdateSuccess())
.matchEquals(CLOSE, cmd -> receiveClose())
.match(Changed.class, c -> c.key().equals(closedKey), c -> receiveClosedChanged((Changed<Flag>) c))
.build();
}
private void receiveVote(Vote vote) {
Update<PNCounterMap> update = new Update<>(countersKey, PNCounterMap.create(), Replicator.writeLocal(),
curr -> curr.increment(node, vote.participant, 1));
replicator.tell(update, self());
}
private void receiveUpdateSuccess() {
// ok
}
private void receiveClose() {
Update<Flag> update = new Update<>(closedKey, Flag.create(), writeAll, curr -> curr.switchOn());
replicator.tell(update, self());
context().become(matchGetVotes(false));
}
private void receiveClosedChanged(Changed<Flag> c) {
if (c.dataValue().enabled())
context().become(matchGetVotes(false));
}
private PartialFunction<Object, BoxedUnit> matchGetVotes(boolean open) {
return ReceiveBuilder
.matchEquals(GET_VOTES, s -> receiveGetVotes())
.match(NotFound.class, n -> n.key().equals(countersKey), n -> receiveNotFound(open, (NotFound<PNCounterMap>) n))
.match(GetSuccess.class, g -> g.key().equals(countersKey),
g -> receiveGetSuccess(open, (GetSuccess<PNCounterMap>) g))
.match(GetFailure.class, f -> f.key().equals(countersKey), f -> receiveGetFailure())
.match(UpdateSuccess.class, u -> receiveUpdateSuccess()).build();
}
private void receiveGetVotes() {
Optional<Object> ctx = Optional.of(sender());
replicator.tell(new Replicator.Get<PNCounterMap>(countersKey, readAll, ctx), self());
}
private void receiveGetSuccess(boolean open, GetSuccess<PNCounterMap> g) {
Map<String, BigInteger> result = g.dataValue().getEntries();
ActorRef replyTo = (ActorRef) g.getRequest().get();
replyTo.tell(new Votes(result, open), self());
}
private void receiveNotFound(boolean open, NotFound<PNCounterMap> n) {
ActorRef replyTo = (ActorRef) n.getRequest().get();
replyTo.tell(new Votes(new HashMap<>(), open), self());
}
private void receiveGetFailure() {
// skip
}
}

View file

@ -0,0 +1,21 @@
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
}

View file

@ -0,0 +1,135 @@
package sample.distributeddata
import java.util.Optional;
import scala.concurrent.duration._
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.Replicator.GetReplicaCount
import akka.cluster.ddata.Replicator.ReplicaCount
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
object ReplicatedCacheSpec extends MultiNodeConfig {
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
"""))
}
class ReplicatedCacheSpecMultiJvmNode1 extends ReplicatedCacheSpec
class ReplicatedCacheSpecMultiJvmNode2 extends ReplicatedCacheSpec
class ReplicatedCacheSpecMultiJvmNode3 extends ReplicatedCacheSpec
class ReplicatedCacheSpec extends MultiNodeSpec(ReplicatedCacheSpec) with STMultiNodeSpec with ImplicitSender {
import ReplicatedCacheSpec._
import ReplicatedCache._
override def initialParticipants = roles.size
val cluster = Cluster(system)
val replicatedCache = system.actorOf(ReplicatedCache.props)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
"Demo of a replicated cache" must {
"join cluster" in within(10.seconds) {
join(node1, node1)
join(node2, node1)
join(node3, node1)
awaitAssert {
DistributedData(system).replicator ! GetReplicaCount
expectMsg(ReplicaCount(roles.size))
}
enterBarrier("after-1")
}
"replicate cached entry" in within(10.seconds) {
runOn(node1) {
replicatedCache ! new PutInCache("key1", "A")
}
awaitAssert {
val probe = TestProbe()
replicatedCache.tell(new GetFromCache("key1"), probe.ref)
probe.expectMsg(new Cached("key1", Optional.of("A")))
}
enterBarrier("after-2")
}
"replicate many cached entries" in within(10.seconds) {
runOn(node1) {
for (i 100 to 200)
replicatedCache ! new PutInCache("key" + i, i)
}
awaitAssert {
val probe = TestProbe()
for (i 100 to 200) {
replicatedCache.tell(new GetFromCache("key" + i), probe.ref)
probe.expectMsg(new Cached("key" + i, Optional.of(Integer.valueOf(i))))
}
}
enterBarrier("after-3")
}
"replicate evicted entry" in within(15.seconds) {
runOn(node1) {
replicatedCache ! new PutInCache("key2", "B")
}
awaitAssert {
val probe = TestProbe()
replicatedCache.tell(new GetFromCache("key2"), probe.ref)
probe.expectMsg(new Cached("key2", Optional.of("B")))
}
enterBarrier("key2-replicated")
runOn(node3) {
replicatedCache ! new Evict("key2")
}
awaitAssert {
val probe = TestProbe()
replicatedCache.tell(new GetFromCache("key2"), probe.ref)
probe.expectMsg(new Cached("key2", Optional.empty()))
}
enterBarrier("after-4")
}
"replicate updated cached entry" in within(10.seconds) {
runOn(node2) {
replicatedCache ! new PutInCache("key1", "A2")
replicatedCache ! new PutInCache("key1", "A3")
}
awaitAssert {
val probe = TestProbe()
replicatedCache.tell(new GetFromCache("key1"), probe.ref)
probe.expectMsg(new Cached("key1", Optional.of("A3")))
}
enterBarrier("after-5")
}
}
}

View file

@ -0,0 +1,92 @@
package sample.distributeddata
import scala.concurrent.duration._
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.Replicator.GetReplicaCount
import akka.cluster.ddata.Replicator.ReplicaCount
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters._
object ReplicatedMetricsSpec extends MultiNodeConfig {
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
"""))
}
class ReplicatedMetricsSpecMultiJvmNode1 extends ReplicatedMetricsSpec
class ReplicatedMetricsSpecMultiJvmNode2 extends ReplicatedMetricsSpec
class ReplicatedMetricsSpecMultiJvmNode3 extends ReplicatedMetricsSpec
class ReplicatedMetricsSpec extends MultiNodeSpec(ReplicatedMetricsSpec) with STMultiNodeSpec with ImplicitSender {
import ReplicatedMetricsSpec._
import ReplicatedMetrics._
override def initialParticipants = roles.size
val cluster = Cluster(system)
val replicatedMetrics = system.actorOf(ReplicatedMetrics.props(1.second, 3.seconds))
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
"Demo of a replicated metrics" must {
"join cluster" in within(10.seconds) {
join(node1, node1)
join(node2, node1)
join(node3, node1)
awaitAssert {
DistributedData(system).replicator ! GetReplicaCount
expectMsg(ReplicaCount(roles.size))
}
enterBarrier("after-1")
}
"replicate metrics" in within(10.seconds) {
val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[UsedHeap])
awaitAssert {
probe.expectMsgType[UsedHeap](1.second).percentPerNode.size should be(3)
}
probe.expectMsgType[UsedHeap].percentPerNode.size should be(3)
probe.expectMsgType[UsedHeap].percentPerNode.size should be(3)
enterBarrier("after-2")
}
"cleanup removed node" in within(25.seconds) {
val node3Address = node(node3).address
runOn(node1) {
cluster.leave(node3Address)
}
runOn(node1, node2) {
val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[UsedHeap])
awaitAssert {
probe.expectMsgType[UsedHeap](1.second).percentPerNode.size should be(2)
}
probe.expectMsgType[UsedHeap].percentPerNode.asScala.toMap should not contain (
nodeKey(node3Address))
}
enterBarrier("after-3")
}
}
}

View file

@ -0,0 +1,17 @@
package sample.distributeddata
import akka.remote.testkit.MultiNodeSpecCallbacks
import org.scalatest.{ BeforeAndAfterAll, WordSpecLike }
import org.scalatest.Matchers
/**
* Hooks up MultiNodeSpec with ScalaTest
*/
trait STMultiNodeSpec extends MultiNodeSpecCallbacks
with WordSpecLike with Matchers with BeforeAndAfterAll {
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
}

View file

@ -0,0 +1,142 @@
package sample.distributeddata
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.PoisonPill
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.Replicator.GetReplicaCount
import akka.cluster.ddata.Replicator.ReplicaCount
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters._
object ServiceRegistrySpec extends MultiNodeConfig {
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
"""))
class Service extends Actor {
def receive = {
case s: String sender() ! self.path.name + ": " + s
}
}
}
class ServiceRegistrySpecMultiJvmNode1 extends ServiceRegistrySpec
class ServiceRegistrySpecMultiJvmNode2 extends ServiceRegistrySpec
class ServiceRegistrySpecMultiJvmNode3 extends ServiceRegistrySpec
class ServiceRegistrySpec extends MultiNodeSpec(ServiceRegistrySpec) with STMultiNodeSpec with ImplicitSender {
import ServiceRegistrySpec._
import ServiceRegistry._
override def initialParticipants = roles.size
val cluster = Cluster(system)
val registry = system.actorOf(ServiceRegistry.props)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
"Demo of a replicated service registry" must {
"join cluster" in within(10.seconds) {
join(node1, node1)
join(node2, node1)
join(node3, node1)
awaitAssert {
DistributedData(system).replicator ! GetReplicaCount
expectMsg(ReplicaCount(roles.size))
}
enterBarrier("after-1")
}
"replicate service entry" in within(10.seconds) {
runOn(node1) {
val a1 = system.actorOf(Props[Service], name = "a1")
registry ! new Register("a", a1)
}
awaitAssert {
val probe = TestProbe()
registry.tell(new Lookup("a"), probe.ref)
probe.expectMsgType[Bindings].services.asScala.map(_.path.name).toSet should be(Set("a1"))
}
enterBarrier("after-2")
}
"replicate updated service entry, and publish to even bus" in {
val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[BindingChanged])
runOn(node2) {
val a2 = system.actorOf(Props[Service], name = "a2")
registry ! new Register("a", a2)
}
probe.within(10.seconds) {
probe.expectMsgType[BindingChanged].services.asScala.map(_.path.name).toSet should be(Set("a1", "a2"))
registry.tell(new Lookup("a"), probe.ref)
probe.expectMsgType[Bindings].services.asScala.map(_.path.name).toSet should be(Set("a1", "a2"))
}
enterBarrier("after-4")
}
"remove terminated service" in {
val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[BindingChanged])
runOn(node2) {
registry.tell(new Lookup("a"), probe.ref)
val a2 = probe.expectMsgType[Bindings].services.asScala.find(_.path.name == "a2").get
a2 ! PoisonPill
}
probe.within(10.seconds) {
probe.expectMsgType[BindingChanged].services.asScala.map(_.path.name).toSet should be(Set("a1"))
registry.tell(new Lookup("a"), probe.ref)
probe.expectMsgType[Bindings].services.asScala.map(_.path.name).toSet should be(Set("a1"))
}
enterBarrier("after-5")
}
"replicate many service entries" in within(10.seconds) {
for (i 100 until 200) {
val service = system.actorOf(Props[Service], name = myself.name + "_" + i)
registry ! new Register("a" + i, service)
}
awaitAssert {
val probe = TestProbe()
for (i 100 until 200) {
registry.tell(new Lookup("a" + i), probe.ref)
probe.expectMsgType[Bindings].services.asScala.map(_.path.name).toSet should be(roles.map(_.name + "_" + i).toSet)
}
}
enterBarrier("after-6")
}
}
}

View file

@ -0,0 +1,101 @@
package sample.distributeddata
import scala.concurrent.duration._
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.Replicator.GetReplicaCount
import akka.cluster.ddata.Replicator.ReplicaCount
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters._
object ShoppingCartSpec extends MultiNodeConfig {
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
"""))
}
class ShoppingCartSpecMultiJvmNode1 extends ShoppingCartSpec
class ShoppingCartSpecMultiJvmNode2 extends ShoppingCartSpec
class ShoppingCartSpecMultiJvmNode3 extends ShoppingCartSpec
class ShoppingCartSpec extends MultiNodeSpec(ShoppingCartSpec) with STMultiNodeSpec with ImplicitSender {
import ShoppingCartSpec._
import ShoppingCart._
override def initialParticipants = roles.size
val cluster = Cluster(system)
val shoppingCart = system.actorOf(ShoppingCart.props("user-1"))
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
"Demo of a replicated shopping cart" must {
"join cluster" in within(10.seconds) {
join(node1, node1)
join(node2, node1)
join(node3, node1)
awaitAssert {
DistributedData(system).replicator ! GetReplicaCount
expectMsg(ReplicaCount(roles.size))
}
enterBarrier("after-1")
}
"handle updates directly after start" in within(15.seconds) {
runOn(node2) {
shoppingCart ! new ShoppingCart.AddItem(new LineItem("1", "Apples", 2))
shoppingCart ! new ShoppingCart.AddItem(new LineItem("2", "Oranges", 3))
}
enterBarrier("updates-done")
awaitAssert {
shoppingCart ! ShoppingCart.GET_CART
val cart = expectMsgType[Cart]
cart.items.asScala.toSet should be(Set(
new LineItem("1", "Apples", 2), new LineItem("2", "Oranges", 3)))
}
enterBarrier("after-2")
}
"handle updates from different nodes" in within(5.seconds) {
runOn(node2) {
shoppingCart ! new ShoppingCart.AddItem(new LineItem("1", "Apples", 5))
shoppingCart ! new ShoppingCart.RemoveItem("2")
}
runOn(node3) {
shoppingCart ! new ShoppingCart.AddItem(new LineItem("3", "Bananas", 4))
}
enterBarrier("updates-done")
awaitAssert {
shoppingCart ! ShoppingCart.GET_CART
val cart = expectMsgType[Cart]
cart.items.asScala.toSet should be(
Set(new LineItem("1", "Apples", 7), new LineItem("3", "Bananas", 4)))
}
enterBarrier("after-3")
}
}
}

View file

@ -0,0 +1,101 @@
package sample.distributeddata
import java.math.BigInteger
import scala.concurrent.duration._
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.Replicator.GetReplicaCount
import akka.cluster.ddata.Replicator.ReplicaCount
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
object VotingServiceSpec extends MultiNodeConfig {
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
"""))
}
class VotingServiceSpecMultiJvmNode1 extends VotingServiceSpec
class VotingServiceSpecMultiJvmNode2 extends VotingServiceSpec
class VotingServiceSpecMultiJvmNode3 extends VotingServiceSpec
class VotingServiceSpec extends MultiNodeSpec(VotingServiceSpec) with STMultiNodeSpec with ImplicitSender {
import VotingServiceSpec._
override def initialParticipants = roles.size
val cluster = Cluster(system)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
"Demo of a replicated voting" must {
"join cluster" in within(10.seconds) {
join(node1, node1)
join(node2, node1)
join(node3, node1)
awaitAssert {
DistributedData(system).replicator ! GetReplicaCount
expectMsg(ReplicaCount(roles.size))
}
enterBarrier("after-1")
}
"count votes correctly" in within(15.seconds) {
import VotingService._
val votingService = system.actorOf(Props[VotingService], "votingService")
val N = 1000
runOn(node1) {
votingService ! VotingService.OPEN
for (n 1 to N) {
votingService ! new Vote("#" + ((n % 20) + 1))
}
}
runOn(node2, node3) {
// wait for it to open
val p = TestProbe()
awaitAssert {
votingService.tell(VotingService.GET_VOTES, p.ref)
p.expectMsgType[Votes](3.seconds).open should be(true)
}
for (n 1 to N) {
votingService ! new Vote("#" + ((n % 20) + 1))
}
}
enterBarrier("voting-done")
runOn(node3) {
votingService ! VotingService.CLOSE
}
val expected = (1 to 20).map(n "#" + n -> BigInteger.valueOf(3L * N / 20)).toMap
awaitAssert {
votingService ! VotingService.GET_VOTES
val votes = expectMsgType[Votes](3.seconds)
votes.open should be (false)
import scala.collection.JavaConverters._
votes.result.asScala.toMap should be (expected)
}
enterBarrier("after-2")
}
}
}

View file

@ -0,0 +1,306 @@
<!-- <html> -->
<head>
<title>Akka Distributed Data Samples with Java</title>
</head>
<body>
<div>
<p>
This tutorial contains 5 samples illustrating how to use
<a href="http://doc.akka.io/docs/akka/2.4-SNAPSHOT/java/distributed-data.html" target="_blank">Akka Distributed Data</a>.
</p>
<ul>
<li>Low Latency Voting Service</li>
<li>Highly Available Shopping Cart</li>
<li>Distributed Service Registry</li>
<li>Replicated Cache</li>
<li>Replicated Metrics</li>
</ul>
<p>
<b>Akka Distributed Data</b> is useful when you need to share data between nodes in an
Akka Cluster. The data is accessed with an actor providing a key-value store like API.
The keys are unique identifiers with type information of the data values. The values
are <i>Conflict Free Replicated Data Types</i> (CRDTs).
</p>
<p>
All data entries are spread to all nodes, or nodes with a certain role, in the cluster
via direct replication and gossip based dissemination. You have fine grained control
of the consistency level for reads and writes.
</p>
<p>
The nature CRDTs makes it possible to perform updates from any node without coordination.
Concurrent updates from different nodes will automatically be resolved by the monotonic
merge function, which all data types must provide. The state changes always converge.
Several useful data types for counters, sets, maps and registers are provided and
you can also implement your own custom data types.
</p>
<p>
It is eventually consistent and geared toward providing high read and write availability
(partition tolerance), with low latency. Note that in an eventually consistent system a read may return an
out-of-date value.
</p>
<p>
Note that there are some
<a href="http://doc.akka.io/docs/akka/2.4-SNAPSHOT/java/distributed-data.html#Limitations" target="_blank">Limitations</a>
that you should be aware of. For example, Akka Distributed Data is not intended for <i>Big Data</i>.
</p>
</div>
<div>
<h2>Low Latency Voting Service</h2>
<p>
Distributed Data is great for low latency services, since you can update or get data from the local replica
without immediate communication with other nodes.
</p>
<p>
Open <a href="#code/src/main/java/sample/distributeddata/VotingService.java" class="shortcut">VotingService.java</a>.
</p>
<p>
<code>VotingService</code> is an actor for low latency counting of votes on several cluster nodes and aggregation
of the grand total number of votes. The actor is started on each cluster node. First it expects an
<code>OPEN</code> message on one or several nodes. After that the counting can begin. The open
signal is immediately replicated to all nodes with a boolean
<a href="http://doc.akka.io/docs/akka/2.4-SNAPSHOT/java/distributed-data.html#Flags_and_Registers" target="_blank">Flag</a>.
Note <code>writeAll</code>.
</p>
<pre><code>
Update&lt;Flag&gt; update = new Update&lt;&gt;(openedKey, Flag.create(), writeAll, curr -> curr.switchOn());
</code></pre>
<p>
The actor is subscribing to changes of the <code>OpenedKey</code> and other instances of this actor,
also on other nodes, will be notified when the flag is changed.
</p>
<pre><code>
replicator.tell(new Subscribe&lt;&gt;(openedKey, self()), ActorRef.noSender());
</code></pre>
<pre><code>
.match(Changed.class, c -> c.key().equals(openedKey), c -> receiveOpenedChanged((Changed&lt;Flag&gt;) c))
</code></pre>
<p>
The counters are kept in a
<a href="http://doc.akka.io/docs/akka/2.4-SNAPSHOT/java/distributed-data.html#Counters" target="_blank">PNCounterMap</a>
and updated with:
</p>
<pre><code>
Update&lt;PNCounterMap&gt; update = new Update&lt;&gt;(countersKey, PNCounterMap.create(), Replicator.writeLocal(),
curr -> curr.increment(node, vote.participant, 1));
replicator.tell(update, self());
</code></pre>
<p>
Incrementing the counter is very fast, since it only involves communication with the local
<code>Replicator</code> actor. Note <code>writeLocal</code>. Those updates are also spread
to other nodes, but that is performed in the background.
</p>
<p>
The total number of votes is retrieved with:
</p>
<pre><code>
Optional&lt;Object&gt; ctx = Optional.of(sender());
replicator.tell(new Replicator.Get&lt;PNCounterMap&gt;(countersKey, readAll, ctx), self());
</code></pre>
<pre><code>
.match(GetSuccess.class, g -> g.key().equals(countersKey),
g -> receiveGetSuccess(open, (GetSuccess&lt;PNCounterMap&gt;) g))
</code></pre>
<pre><code>
private void receiveGetSuccess(boolean open, GetSuccess&lt;PNCounterMap&gt; g) {
Map&lt;String, BigInteger&gt; result = g.dataValue().getEntries();
ActorRef replyTo = (ActorRef) g.getRequest().get();
replyTo.tell(new Votes(result, open), self());
}
</code></pre>
<p>
The multi-node test for the <code>VotingService</code> can be found in
<a href="#code/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala" class="shortcut">VotingServiceSpec.scala</a>.
</p>
<p>
Read the
<a href="http://doc.akka.io/docs/akka/2.4-SNAPSHOT/java/distributed-data.html#Using_the_Replicator" target="_blank">Using the Replicator</a>
documentation for more details of how to use <code>Get</code>, <code>Update</code>, and <code>Subscribe</code>.
</p>
</div>
<div>
<h2>Highly Available Shopping Cart</h2>
<p>
Distributed Data is great for highly available services, since it is possible to perform
updates to the local node (or currently available nodes) during a network partition.
</p>
<p>
Open <a href="#code/src/main/java/sample/distributeddata/ShoppingCart.java" class="shortcut">ShoppingCart.java</a>.
</p>
<p>
<code>ShoppingCart</code> is an actor that holds the selected items to buy for a user.
The actor instance for a specific user may be started where ever needed in the cluster, i.e. several
instances may be started on different nodes and used at the same time.
</p>
<p>
Each product in the cart is represented by a <code>LineItem</code> and all items in the cart
is collected in a <a href="http://doc.akka.io/docs/akka/2.4-SNAPSHOT/java/distributed-data.html#Maps" target="_blank">LWWMap</a>.
</p>
<p>
The actor handles the commands <code>GET_CART</code>, <code>AddItem</code> and <code>RemoveItem</code>.
To get the latest updates in case the same shopping cart is used from several nodes it is using
consistency level of <code>readMajority</code> and <code>writeMajority</code>, but that is only
done to reduce the risk of seeing old data. If such reads and writes cannot be completed due to a
network partition it falls back to reading/writing from the local replica (see <code>GetFailure</code>).
Local reads and writes will always be successful and when the network partition heals the updated
shopping carts will be be disseminated by the
<a href="https://en.wikipedia.org/wiki/Gossip_protocol" target="_blank">gossip protocol</a>
and the <code>LWWMap</code> CRDTs are merged, i.e. it is a highly available shopping cart.
</p>
<p>
The multi-node test for the <code>ShoppingCart</code> can be found in
<a href="#code/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala" class="shortcut">ShoppingCartSpec.scala</a>.
</p>
<p>
Read the
<a href="http://doc.akka.io/docs/akka/2.4-SNAPSHOT/java/distributed-data.html#Consistency" target="_blank">Consistency</a>
section in the documentation to understand the consistency considerations.
</p>
</div>
<div>
<h2>Distributed Service Registry</h2>
<p>
Have you ever had the need to lookup actors by name in an Akka Cluster?
This example illustrates how you could implement such a registry. It is probably not
feature complete, but should be a good starting point.
</p>
<p>
Open <a href="#code/src/main/java/sample/distributeddata/ServiceRegistry.java" class="shortcut">ServiceRegistry.java</a>.
</p>
<p>
<code>ServiceRegistry</code> is an actor that is started on each node in the cluster.
It supports two basic commands:
</p>
<ul>
<li><code>Register</code> to bind an <code>ActorRef</code> to a name,
several actors can be bound to the same name</li>
<li><code>Lookup</code> get currently bound services of a given name</li>
</ul>
<p>
For each named service it is using an
<a href="http://doc.akka.io/docs/akka/2.4-SNAPSHOT/java/distributed-data.html#Sets" target="_blank">ORSet</a>.
Here we are using top level <code>ORSet</code> entries. An alternative would have been to use a
<code>ORMultiMap</code> holding all services. That would have a disadvantage if we have many services.
When a data entry is changed the full state of that entry is replicated to other nodes, i.e. when you
update a map the whole map is replicated.
</p>
<p>
The <code>ServiceRegistry</code> is subscribing to changes of a <code>GSet</code> where we add
the names of all services. It is also subscribing to all such service keys to get notifications when
actors are added or removed to a named service.
</p>
<p>
The multi-node test for the <code>ServiceRegistry</code> can be found in
<a href="#code/src/multi-jvm/scala/sample/distributeddata/ServiceRegistrySpec.scala" class="shortcut">ServiceRegistrySpec.scala</a>.
</p>
</div>
<div>
<h2>Replicated Cache</h2>
<p>
This example illustrates a simple key-value cache.
</p>
<p>
Open <a href="#code/src/main/java/sample/distributeddata/ReplicatedCache.java" class="shortcut">ReplicatedCache.scala</a>.
</p>
<p>
<code>ReplicatedCache</code> is an actor that is started on each node in the cluster.
It supports three commands: <code>PutInCache</code>, <code>GetFromCache</code> and <code>Evict</code>.
</p>
<p>
It is splitting up the key space in 100 top level keys, each with a <code>LWWMap</code>.
When a data entry is changed the full state of that entry is replicated to other nodes, i.e. when you
update a map the whole map is replicated. Therefore, instead of using one ORMap with 1000 elements it
is more efficient to split that up in 100 top level ORMap entries with 10 elements each. Top level
entries are replicated individually, which has the trade-off that different entries may not be
replicated at the same time and you may see inconsistencies between related entries.
Separate top level entries cannot be updated atomically together.
</p>
<p>
The multi-node test for the <code>ReplicatedCache</code> can be found in
<a href="#code/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala" class="shortcut">ReplicatedCacheSpec.scala</a>.
</p>
</div>
<div>
<h2>Replicated Metrics</h2>
<p>
This example illustrates to spread metrics data to all nodes in an Akka cluster.
</p>
<p>
Open <a href="#code/src/main/java/sample/distributeddata/ReplicatedMetrics.java" class="shortcut">ReplicatedMetrics.java</a>.
</p>
<p>
<code>ReplicatedMetrics</code> is an actor that is started on each node in the cluster.
Periodically it collects some metrics, in this case used and max heap size.
Each metrics type is stored in a <code>LWWMap</code> where the key in the map is the address of
the node. The values are disseminated to other nodes with the gossip protocol.
</p>
<p>
The multi-node test for the <code>ReplicatedCache</code> can be found in
<a href="#code/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala" class="shortcut">ReplicatedMetricsSpec.scala</a>.
</p>
<p>
Note that there are some
<a href="http://doc.akka.io/docs/akka/2.4-M2/scala/distributed-data.html#Limitations" target="_blank">Limitations</a>
that you should be aware of. For example, Akka Distributed Data is not intended for <i>Big Data</i>.
</p>
</div>
</body>
</html>

View file

@ -101,6 +101,8 @@ class ReplicatedMetrics(measureInterval: FiniteDuration, cleanupInterval: Finite
case MemberRemoved(m, _)
nodesInCluster -= nodeKey(m.address)
if (m.address == cluster.selfAddress)
context.stop(self)
case Cleanup
def cleanupRemoved(data: LWWMap[Long]): LWWMap[Long] =

View file

@ -35,7 +35,7 @@ object ServiceRegistry {
*/
final case class Bindings(name: String, services: Set[ActorRef])
/**
* Published to `System.eventStream` when services are changed.
* Published to `ActorSystem.eventStream` when services are changed.
*/
final case class BindingChanged(name: String, services: Set[ActorRef])
@ -78,8 +78,8 @@ class ServiceRegistry extends Actor with ActorLogging {
// add the service
replicator ! Update(dKey, ORSet(), WriteLocal)(_ + service)
case Lookup(key)
sender() ! Bindings(key, services.getOrElse(key, Set.empty))
case Lookup(name)
sender() ! Bindings(name, services.getOrElse(name, Set.empty))
case c @ Changed(AllServicesKey)
val newKeys = c.get(AllServicesKey).elements

View file

@ -69,15 +69,16 @@ class ShoppingCart(userId: String) extends Actor {
cart updateCart(cart, item)
}
replicator ! update
case GetFailure(DataKey, Some(AddItem(item)))
// ReadMajority of Update failed, fall back to best effort local value
replicator ! Update(DataKey, LWWMap.empty[LineItem], writeMajority, None) {
cart updateCart(cart, item)
}
}
//#add-item
def updateCart(data: LWWMap[LineItem], item: LineItem): LWWMap[LineItem] =
data.get(item.productId) match {
case Some(LineItem(_, _, existingQuantity))
data + (item.productId -> item.copy(quantity = existingQuantity + item.quantity))
case None data + (item.productId -> item)
}
//#remove-item
def receiveRemoveItem: Receive = {
case cmd @ RemoveItem(productId)
@ -107,11 +108,4 @@ class ShoppingCart(userId: String) extends Actor {
case e: UpdateFailure[_] throw new IllegalStateException("Unexpected failure: " + e)
}
def updateCart(data: LWWMap[LineItem], item: LineItem): LWWMap[LineItem] =
data.get(item.productId) match {
case Some(LineItem(_, _, existingQuantity))
data + (item.productId -> item.copy(quantity = existingQuantity + item.quantity))
case None data + (item.productId -> item)
}
}

View file

@ -61,14 +61,14 @@ class ReplicatedMetricsSpec extends MultiNodeSpec(ReplicatedMetricsSpec) with ST
val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[UsedHeap])
awaitAssert {
probe.expectMsgType[UsedHeap].percentPerNode.size should be(3)
probe.expectMsgType[UsedHeap](1.second).percentPerNode.size should be(3)
}
probe.expectMsgType[UsedHeap].percentPerNode.size should be(3)
probe.expectMsgType[UsedHeap].percentPerNode.size should be(3)
enterBarrier("after-2")
}
"cleanup removed node" in within(15.seconds) {
"cleanup removed node" in within(25.seconds) {
val node3Address = node(node3).address
runOn(node1) {
cluster.leave(node3Address)
@ -77,7 +77,7 @@ class ReplicatedMetricsSpec extends MultiNodeSpec(ReplicatedMetricsSpec) with ST
val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[UsedHeap])
awaitAssert {
probe.expectMsgType[UsedHeap].percentPerNode.size should be(2)
probe.expectMsgType[UsedHeap](1.second).percentPerNode.size should be(2)
}
probe.expectMsgType[UsedHeap].percentPerNode should not contain (
nodeKey(node3Address))

View file

@ -45,6 +45,12 @@ It is eventually consistent and geared toward providing high read and write avai
out-of-date value.
</p>
<p>
Note that there are some
<a href="http://doc.akka.io/docs/akka/2.4-SNAPSHOT/scala/distributed-data.html#Limitations" target="_blank">Limitations</a>
that you should be aware of. For example, Akka Distributed Data is not intended for <i>Big Data</i>.
</p>
</div>
<div>
@ -127,6 +133,7 @@ The multi-node test for the <code>VotingService</code> can be found in
Read the
<a href="http://doc.akka.io/docs/akka/2.4-SNAPSHOT/scala/distributed-data.html#Using_the_Replicator" target="_blank">Using the Replicator</a>
documentation for more details of how to use <code>Get</code>, <code>Update</code>, and <code>Subscribe</code>.
</p>
</div>

View file

@ -212,7 +212,7 @@ object AkkaBuild extends Build {
sampleMainJava, sampleMainScala, sampleMainJavaLambda, sampleMultiNodeScala,
samplePersistenceJava, samplePersistenceScala, samplePersistenceJavaLambda,
sampleRemoteJava, sampleRemoteScala, sampleSupervisionJavaLambda,
sampleDistributedDataScala)
sampleDistributedDataScala, sampleDistributedDataJava)
)
lazy val sampleCamelJava = Sample.project("akka-sample-camel-java")
@ -240,6 +240,7 @@ object AkkaBuild extends Build {
lazy val sampleSupervisionJavaLambda = Sample.project("akka-sample-supervision-java-lambda")
lazy val sampleDistributedDataScala = Sample.project("akka-sample-distributed-data-scala")
lazy val sampleDistributedDataJava = Sample.project("akka-sample-distributed-data-java")
lazy val osgiDiningHakkersSampleMavenTest = Project(id = "akka-sample-osgi-dining-hakkers-maven-test",
base = file("akka-samples/akka-sample-osgi-dining-hakkers-maven-test"),