diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala index 9f344ad9fb..736f7d9e53 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ReplicatedData.scala @@ -24,6 +24,10 @@ import akka.cluster.UniqueAddress * a new instance. */ trait ReplicatedData { + /** + * The type of the concrete implementation, e.g. `GSet[A]`. + * To be specified by subclass. + */ type T <: ReplicatedData /** @@ -34,14 +38,26 @@ trait ReplicatedData { } /** - * Java API: Interface for implementing a [[ReplicatedData]] in - * Java. + * Java API: Interface for implementing a [[ReplicatedData]] in Java. + * + * The type parameter `D` is a self-recursive type to be defined by the + * concrete implementation. + * E.g. `class TwoPhaseSet extends AbstractReplicatedData<TwoPhaseSet>` */ -abstract class AbstractReplicatedData extends ReplicatedData { - // it is not possible to use a more strict type, because it is erased somehow, and - // the implementation is anyway required to implement - // merge(that: ReplicatedData): ReplicatedData - type T = AbstractReplicatedData +abstract class AbstractReplicatedData[D <: AbstractReplicatedData[D]] extends ReplicatedData { + + override type T = ReplicatedData + + /** + * Delegates to [[#mergeData]], which must be implemented by subclass. + */ + final override def merge(that: ReplicatedData): ReplicatedData = + mergeData(that.asInstanceOf[D]) + + /** + * Java API: Monotonic merge function. + */ + def mergeData(that: D): D } @@ -52,7 +68,7 @@ abstract class AbstractReplicatedData extends ReplicatedData { * used by the [[Replicator]] to collapse data from the removed node * into some other node in the cluster. */ -trait RemovedNodePruning { this: ReplicatedData ⇒ +trait RemovedNodePruning extends ReplicatedData { /** * Does it have any state changes from a specific node, diff --git a/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfReplicatedData.java b/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfReplicatedData.java index d2f34dce75..d45ea2687c 100644 --- a/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfReplicatedData.java +++ b/akka-distributed-data/src/test/java/akka/cluster/ddata/JavaImplOfReplicatedData.java @@ -5,10 +5,11 @@ package akka.cluster.ddata; import akka.cluster.UniqueAddress; -public class JavaImplOfReplicatedData extends AbstractReplicatedData implements RemovedNodePruning { +public class JavaImplOfReplicatedData extends AbstractReplicatedData implements + RemovedNodePruning { @Override - public JavaImplOfReplicatedData merge(ReplicatedData other) { + public JavaImplOfReplicatedData mergeData(JavaImplOfReplicatedData other) { return this; } diff --git a/akka-docs/rst/java/code/docs/ddata/japi/TwoPhaseSet.java b/akka-docs/rst/java/code/docs/ddata/japi/TwoPhaseSet.java new file mode 100644 index 0000000000..ca99fabd56 --- /dev/null +++ b/akka-docs/rst/java/code/docs/ddata/japi/TwoPhaseSet.java @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package docs.ddata.japi; + +import java.util.HashSet; + +import java.util.Set; + +import akka.cluster.ddata.AbstractReplicatedData; +import akka.cluster.ddata.GSet; + +//#twophaseset +public class TwoPhaseSet extends AbstractReplicatedData { + + public final GSet adds; + public final GSet removals; + + public TwoPhaseSet(GSet adds, GSet removals) { + this.adds = adds; + this.removals = removals; + } + + public static TwoPhaseSet create() { + return new TwoPhaseSet(GSet.create(), GSet.create()); + } + + public TwoPhaseSet add(String element) { + return new TwoPhaseSet(adds.add(element), removals); + } + + public TwoPhaseSet remove(String element) { + return new TwoPhaseSet(adds, removals.add(element)); + } + + public Set getElements() { + Set result = new HashSet<>(adds.getElements()); + result.removeAll(removals.getElements()); + return result; + } + + @Override + public TwoPhaseSet mergeData(TwoPhaseSet that) { + return new TwoPhaseSet(this.adds.merge(that.adds), + this.removals.merge(that.removals)); + } +} +//#twophaseset diff --git a/akka-docs/rst/java/code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer.java b/akka-docs/rst/java/code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer.java new file mode 100644 index 0000000000..c75e0d194d --- /dev/null +++ b/akka-docs/rst/java/code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer.java @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package docs.ddata.japi.protobuf; + +//#serializer +import docs.ddata.japi.TwoPhaseSet; +import docs.ddata.protobuf.msg.TwoPhaseSetMessages; +import docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet.Builder; +import java.util.ArrayList; +import java.util.Collections; + +import akka.actor.ExtendedActorSystem; +import akka.cluster.ddata.GSet; +import akka.cluster.ddata.protobuf.AbstractSerializationSupport; + +public class TwoPhaseSetSerializer extends AbstractSerializationSupport { + + private final ExtendedActorSystem system; + + public TwoPhaseSetSerializer(ExtendedActorSystem system) { + this.system = system; + } + + @Override + public ExtendedActorSystem system() { + return this.system; + } + + @Override + public boolean includeManifest() { + return false; + } + + @Override + public int identifier() { + return 99998; + } + + @Override + public byte[] toBinary(Object obj) { + if (obj instanceof TwoPhaseSet) { + return twoPhaseSetToProto((TwoPhaseSet) obj).toByteArray(); + } else { + throw new IllegalArgumentException( + "Can't serialize object of type " + obj.getClass()); + } + } + + @Override + public Object fromBinaryJava(byte[] bytes, Class manifest) { + return twoPhaseSetFromBinary(bytes); + } + + protected TwoPhaseSetMessages.TwoPhaseSet twoPhaseSetToProto(TwoPhaseSet twoPhaseSet) { + Builder b = TwoPhaseSetMessages.TwoPhaseSet.newBuilder(); + ArrayList adds = new ArrayList<>(twoPhaseSet.adds.getElements()); + if (!adds.isEmpty()) { + Collections.sort(adds); + b.addAllAdds(adds); + } + ArrayList removals = new ArrayList<>(twoPhaseSet.removals.getElements()); + if (!removals.isEmpty()) { + Collections.sort(removals); + b.addAllRemovals(removals); + } + return b.build(); + } + + protected TwoPhaseSet twoPhaseSetFromBinary(byte[] bytes) { + try { + TwoPhaseSetMessages.TwoPhaseSet msg = + TwoPhaseSetMessages.TwoPhaseSet.parseFrom(bytes); + GSet adds = GSet.create(); + for (String elem : msg.getAddsList()) { + adds = adds.add(elem); + } + GSet removals = GSet.create(); + for (String elem : msg.getRemovalsList()) { + removals = removals.add(elem); + } + return new TwoPhaseSet(adds, removals); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } +} +//#serializer + + diff --git a/akka-docs/rst/java/code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer2.java b/akka-docs/rst/java/code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer2.java new file mode 100644 index 0000000000..3497b02ff3 --- /dev/null +++ b/akka-docs/rst/java/code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer2.java @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package docs.ddata.japi.protobuf; + +//#serializer +import docs.ddata.japi.TwoPhaseSet; +import docs.ddata.protobuf.msg.TwoPhaseSetMessages; +import docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2.Builder; +import java.util.ArrayList; +import java.util.Collections; + +import akka.actor.ExtendedActorSystem; +import akka.cluster.ddata.GSet; +import akka.cluster.ddata.protobuf.AbstractSerializationSupport; +import akka.cluster.ddata.protobuf.ReplicatedDataSerializer; + +public class TwoPhaseSetSerializer2 extends AbstractSerializationSupport { + + private final ExtendedActorSystem system; + private final ReplicatedDataSerializer replicatedDataSerializer; + + public TwoPhaseSetSerializer2(ExtendedActorSystem system) { + this.system = system; + this.replicatedDataSerializer = new ReplicatedDataSerializer(system); + } + + @Override + public ExtendedActorSystem system() { + return this.system; + } + + @Override + public boolean includeManifest() { + return false; + } + + @Override + public int identifier() { + return 99998; + } + + @Override + public byte[] toBinary(Object obj) { + if (obj instanceof TwoPhaseSet) { + return twoPhaseSetToProto((TwoPhaseSet) obj).toByteArray(); + } else { + throw new IllegalArgumentException( + "Can't serialize object of type " + obj.getClass()); + } + } + + @Override + public Object fromBinaryJava(byte[] bytes, Class manifest) { + return twoPhaseSetFromBinary(bytes); + } + + protected TwoPhaseSetMessages.TwoPhaseSet2 twoPhaseSetToProto(TwoPhaseSet twoPhaseSet) { + Builder b = TwoPhaseSetMessages.TwoPhaseSet2.newBuilder(); + if (!twoPhaseSet.adds.isEmpty()) + b.setAdds(otherMessageToProto(twoPhaseSet.adds).toByteString()); + if (!twoPhaseSet.removals.isEmpty()) + b.setRemovals(otherMessageToProto(twoPhaseSet.removals).toByteString()); + return b.build(); + } + + @SuppressWarnings("unchecked") + protected TwoPhaseSet twoPhaseSetFromBinary(byte[] bytes) { + try { + TwoPhaseSetMessages.TwoPhaseSet2 msg = + TwoPhaseSetMessages.TwoPhaseSet2.parseFrom(bytes); + + GSet adds = GSet.create(); + if (msg.hasAdds()) + adds = (GSet) otherMessageFromBinary(msg.getAdds().toByteArray()); + + GSet removals = GSet.create(); + if (msg.hasRemovals()) + adds = (GSet) otherMessageFromBinary(msg.getRemovals().toByteArray()); + + return new TwoPhaseSet(adds, removals); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } +} +//#serializer + + diff --git a/akka-docs/rst/java/code/docs/ddata/japi/protobuf/TwoPhaseSetSerializerWithCompression.java b/akka-docs/rst/java/code/docs/ddata/japi/protobuf/TwoPhaseSetSerializerWithCompression.java new file mode 100644 index 0000000000..2927dd3e67 --- /dev/null +++ b/akka-docs/rst/java/code/docs/ddata/japi/protobuf/TwoPhaseSetSerializerWithCompression.java @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package docs.ddata.japi.protobuf; + +import docs.ddata.japi.TwoPhaseSet; + +import akka.actor.ExtendedActorSystem; + +public class TwoPhaseSetSerializerWithCompression extends TwoPhaseSetSerializer { + public TwoPhaseSetSerializerWithCompression(ExtendedActorSystem system) { + super(system); + } + + //#compression + @Override + public byte[] toBinary(Object obj) { + if (obj instanceof TwoPhaseSet) { + return compress(twoPhaseSetToProto((TwoPhaseSet) obj)); + } else { + throw new IllegalArgumentException( + "Can't serialize object of type " + obj.getClass()); + } + } + + @Override + public Object fromBinaryJava(byte[] bytes, Class manifest) { + return twoPhaseSetFromBinary(decompress(bytes)); + } + //#compression +} + diff --git a/akka-docs/rst/java/distributed-data.rst b/akka-docs/rst/java/distributed-data.rst index 8c9efc9781..14ebea1956 100644 --- a/akka-docs/rst/java/distributed-data.rst +++ b/akka-docs/rst/java/distributed-data.rst @@ -372,7 +372,7 @@ Custom Data Type ---------------- You can rather easily implement your own data types. The only requirement is that it implements -the ``merge`` function of the ``AbstractReplicatedData`` class. +the ``mergeData`` function of the ``AbstractReplicatedData`` class. A nice property of stateful CRDTs is that they typically compose nicely, i.e. you can combine several smaller data types to build richer data structures. For example, the ``PNCounter`` is composed of @@ -382,9 +382,7 @@ Here is s simple implementation of a custom ``TwoPhaseSet`` that is using two in to keep track of addition and removals. A ``TwoPhaseSet`` is a set where an element may be added and removed, but never added again thereafter. -**FIXME convert this example to Java** - -.. includecode:: ../scala/code/docs/ddata/TwoPhaseSet.scala#twophaseset +.. includecode:: code/docs/ddata/japi/TwoPhaseSet.java#twophaseset Data types should be immutable, i.e. "modifying" methods should return a new instance. @@ -407,23 +405,19 @@ This is a protobuf representation of the above ``TwoPhaseSet``: The serializer for the ``TwoPhaseSet``: -**FIXME convert this example to Java** - -.. includecode:: ../scala/code/docs/ddata/protobuf/TwoPhaseSetSerializer.scala#serializer +.. includecode:: code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer.java#serializer Note that the elements of the sets are sorted so the SHA-1 digests are the same for the same elements. You register the serializer in configuration: -.. includecode:: ../scala/code/docs/ddata/DistributedDataDocSpec.scala#serializer-config +.. includecode:: ../scala/code/docs/ddata/DistributedDataDocSpec.scala#japi-serializer-config Using compression can sometimes be a good idea to reduce the data size. Gzip compression is provided by the ``akka.cluster.ddata.protobuf.SerializationSupport`` trait: -**FIXME convert this example to Java** - -.. includecode:: ../scala/code/docs/ddata/protobuf/TwoPhaseSetSerializer.scala#compression +.. includecode:: code/docs/ddata/japi/protobuf/TwoPhaseSetSerializerWithCompression.java#compression The two embedded ``GSet`` can be serialized as illustrated above, but in general when composing new data types from the existing built in types it is better to make use of the existing @@ -436,9 +430,7 @@ by the ``SerializationSupport`` trait to serialize and deserialize the ``GSet`` works with any type that has a registered Akka serializer. This is how such an serializer would look like for the ``TwoPhaseSet``: -**FIXME convert this example to Java** - -.. includecode:: ../scala/code/docs/ddata/protobuf/TwoPhaseSetSerializer2.scala#serializer +.. includecode:: code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer2.java#serializer CRDT Garbage diff --git a/akka-docs/rst/scala/code/docs/ddata/DistributedDataDocSpec.scala b/akka-docs/rst/scala/code/docs/ddata/DistributedDataDocSpec.scala index 2b4163c9f5..8a450c0615 100644 --- a/akka-docs/rst/scala/code/docs/ddata/DistributedDataDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/ddata/DistributedDataDocSpec.scala @@ -33,6 +33,17 @@ object DistributedDataDocSpec { } } #//#serializer-config + + #//#japi-serializer-config + akka.actor { + serializers { + twophaseset = "docs.ddata.japi.protobuf.TwoPhaseSetSerializer" + } + serialization-bindings { + "docs.ddata.japi.TwoPhaseSet" = twophaseset + } + } + #//#japi-serializer-config """ //#data-bot @@ -391,4 +402,14 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) { s1 should be(s1) } + "test japi.TwoPhaseSetSerializer" in { + import scala.collection.JavaConverters._ + val s1 = japi.TwoPhaseSet.create().add("a").add("b").add("c").remove("b") + s1.getElements.asScala should be(Set("a", "c")) + val serializer = SerializationExtension(system).findSerializerFor(s1) + val blob = serializer.toBinary(s1) + val s2 = serializer.fromBinary(blob, None) + s1 should be(s1) + } + } diff --git a/akka-docs/rst/scala/code/docs/ddata/TwoPhaseSet.scala b/akka-docs/rst/scala/code/docs/ddata/TwoPhaseSet.scala index 4afa40b96b..48b5e71e37 100644 --- a/akka-docs/rst/scala/code/docs/ddata/TwoPhaseSet.scala +++ b/akka-docs/rst/scala/code/docs/ddata/TwoPhaseSet.scala @@ -23,7 +23,7 @@ case class TwoPhaseSet( override def merge(that: TwoPhaseSet): TwoPhaseSet = copy( - adds = GSet(this.adds.elements ++ that.adds.elements), - removals = GSet(this.removals.elements ++ that.removals.elements)) + adds = this.adds.merge(that.adds), + removals = this.removals.merge(that.removals)) } //#twophaseset