!cdd #17770 Use self recursive type in AbstractReplicatedData (Java API)

* complete the TwoPhaseSet java sample with serialization
This commit is contained in:
Patrik Nordwall 2015-07-02 12:12:34 +02:00
parent 49e6e7f38c
commit da07a2e68e
9 changed files with 315 additions and 26 deletions

View file

@ -24,6 +24,10 @@ import akka.cluster.UniqueAddress
* a new instance. * a new instance.
*/ */
trait ReplicatedData { trait ReplicatedData {
/**
* The type of the concrete implementation, e.g. `GSet[A]`.
* To be specified by subclass.
*/
type T <: ReplicatedData type T <: ReplicatedData
/** /**
@ -34,14 +38,26 @@ trait ReplicatedData {
} }
/** /**
* Java API: Interface for implementing a [[ReplicatedData]] in * Java API: Interface for implementing a [[ReplicatedData]] in Java.
* Java. *
* The type parameter `D` is a self-recursive type to be defined by the
* concrete implementation.
* E.g. `class TwoPhaseSet extends AbstractReplicatedData&lt;TwoPhaseSet&gt;`
*/ */
abstract class AbstractReplicatedData extends ReplicatedData { abstract class AbstractReplicatedData[D <: AbstractReplicatedData[D]] extends ReplicatedData {
// it is not possible to use a more strict type, because it is erased somehow, and
// the implementation is anyway required to implement override type T = ReplicatedData
// merge(that: ReplicatedData): ReplicatedData
type T = AbstractReplicatedData /**
* Delegates to [[#mergeData]], which must be implemented by subclass.
*/
final override def merge(that: ReplicatedData): ReplicatedData =
mergeData(that.asInstanceOf[D])
/**
* Java API: Monotonic merge function.
*/
def mergeData(that: D): D
} }
@ -52,7 +68,7 @@ abstract class AbstractReplicatedData extends ReplicatedData {
* used by the [[Replicator]] to collapse data from the removed node * used by the [[Replicator]] to collapse data from the removed node
* into some other node in the cluster. * into some other node in the cluster.
*/ */
trait RemovedNodePruning { this: ReplicatedData trait RemovedNodePruning extends ReplicatedData {
/** /**
* Does it have any state changes from a specific node, * Does it have any state changes from a specific node,

View file

@ -5,10 +5,11 @@ package akka.cluster.ddata;
import akka.cluster.UniqueAddress; import akka.cluster.UniqueAddress;
public class JavaImplOfReplicatedData extends AbstractReplicatedData implements RemovedNodePruning { public class JavaImplOfReplicatedData extends AbstractReplicatedData<JavaImplOfReplicatedData> implements
RemovedNodePruning {
@Override @Override
public JavaImplOfReplicatedData merge(ReplicatedData other) { public JavaImplOfReplicatedData mergeData(JavaImplOfReplicatedData other) {
return this; return this;
} }

View file

@ -0,0 +1,48 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.ddata.japi;
import java.util.HashSet;
import java.util.Set;
import akka.cluster.ddata.AbstractReplicatedData;
import akka.cluster.ddata.GSet;
//#twophaseset
public class TwoPhaseSet extends AbstractReplicatedData<TwoPhaseSet> {
public final GSet<String> adds;
public final GSet<String> removals;
public TwoPhaseSet(GSet<String> adds, GSet<String> removals) {
this.adds = adds;
this.removals = removals;
}
public static TwoPhaseSet create() {
return new TwoPhaseSet(GSet.create(), GSet.create());
}
public TwoPhaseSet add(String element) {
return new TwoPhaseSet(adds.add(element), removals);
}
public TwoPhaseSet remove(String element) {
return new TwoPhaseSet(adds, removals.add(element));
}
public Set<String> getElements() {
Set<String> result = new HashSet<>(adds.getElements());
result.removeAll(removals.getElements());
return result;
}
@Override
public TwoPhaseSet mergeData(TwoPhaseSet that) {
return new TwoPhaseSet(this.adds.merge(that.adds),
this.removals.merge(that.removals));
}
}
//#twophaseset

View file

@ -0,0 +1,90 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.ddata.japi.protobuf;
//#serializer
import docs.ddata.japi.TwoPhaseSet;
import docs.ddata.protobuf.msg.TwoPhaseSetMessages;
import docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet.Builder;
import java.util.ArrayList;
import java.util.Collections;
import akka.actor.ExtendedActorSystem;
import akka.cluster.ddata.GSet;
import akka.cluster.ddata.protobuf.AbstractSerializationSupport;
public class TwoPhaseSetSerializer extends AbstractSerializationSupport {
private final ExtendedActorSystem system;
public TwoPhaseSetSerializer(ExtendedActorSystem system) {
this.system = system;
}
@Override
public ExtendedActorSystem system() {
return this.system;
}
@Override
public boolean includeManifest() {
return false;
}
@Override
public int identifier() {
return 99998;
}
@Override
public byte[] toBinary(Object obj) {
if (obj instanceof TwoPhaseSet) {
return twoPhaseSetToProto((TwoPhaseSet) obj).toByteArray();
} else {
throw new IllegalArgumentException(
"Can't serialize object of type " + obj.getClass());
}
}
@Override
public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
return twoPhaseSetFromBinary(bytes);
}
protected TwoPhaseSetMessages.TwoPhaseSet twoPhaseSetToProto(TwoPhaseSet twoPhaseSet) {
Builder b = TwoPhaseSetMessages.TwoPhaseSet.newBuilder();
ArrayList<String> adds = new ArrayList<>(twoPhaseSet.adds.getElements());
if (!adds.isEmpty()) {
Collections.sort(adds);
b.addAllAdds(adds);
}
ArrayList<String> removals = new ArrayList<>(twoPhaseSet.removals.getElements());
if (!removals.isEmpty()) {
Collections.sort(removals);
b.addAllRemovals(removals);
}
return b.build();
}
protected TwoPhaseSet twoPhaseSetFromBinary(byte[] bytes) {
try {
TwoPhaseSetMessages.TwoPhaseSet msg =
TwoPhaseSetMessages.TwoPhaseSet.parseFrom(bytes);
GSet<String> adds = GSet.create();
for (String elem : msg.getAddsList()) {
adds = adds.add(elem);
}
GSet<String> removals = GSet.create();
for (String elem : msg.getRemovalsList()) {
removals = removals.add(elem);
}
return new TwoPhaseSet(adds, removals);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
//#serializer

View file

@ -0,0 +1,89 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.ddata.japi.protobuf;
//#serializer
import docs.ddata.japi.TwoPhaseSet;
import docs.ddata.protobuf.msg.TwoPhaseSetMessages;
import docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2.Builder;
import java.util.ArrayList;
import java.util.Collections;
import akka.actor.ExtendedActorSystem;
import akka.cluster.ddata.GSet;
import akka.cluster.ddata.protobuf.AbstractSerializationSupport;
import akka.cluster.ddata.protobuf.ReplicatedDataSerializer;
public class TwoPhaseSetSerializer2 extends AbstractSerializationSupport {
private final ExtendedActorSystem system;
private final ReplicatedDataSerializer replicatedDataSerializer;
public TwoPhaseSetSerializer2(ExtendedActorSystem system) {
this.system = system;
this.replicatedDataSerializer = new ReplicatedDataSerializer(system);
}
@Override
public ExtendedActorSystem system() {
return this.system;
}
@Override
public boolean includeManifest() {
return false;
}
@Override
public int identifier() {
return 99998;
}
@Override
public byte[] toBinary(Object obj) {
if (obj instanceof TwoPhaseSet) {
return twoPhaseSetToProto((TwoPhaseSet) obj).toByteArray();
} else {
throw new IllegalArgumentException(
"Can't serialize object of type " + obj.getClass());
}
}
@Override
public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
return twoPhaseSetFromBinary(bytes);
}
protected TwoPhaseSetMessages.TwoPhaseSet2 twoPhaseSetToProto(TwoPhaseSet twoPhaseSet) {
Builder b = TwoPhaseSetMessages.TwoPhaseSet2.newBuilder();
if (!twoPhaseSet.adds.isEmpty())
b.setAdds(otherMessageToProto(twoPhaseSet.adds).toByteString());
if (!twoPhaseSet.removals.isEmpty())
b.setRemovals(otherMessageToProto(twoPhaseSet.removals).toByteString());
return b.build();
}
@SuppressWarnings("unchecked")
protected TwoPhaseSet twoPhaseSetFromBinary(byte[] bytes) {
try {
TwoPhaseSetMessages.TwoPhaseSet2 msg =
TwoPhaseSetMessages.TwoPhaseSet2.parseFrom(bytes);
GSet<String> adds = GSet.create();
if (msg.hasAdds())
adds = (GSet<String>) otherMessageFromBinary(msg.getAdds().toByteArray());
GSet<String> removals = GSet.create();
if (msg.hasRemovals())
adds = (GSet<String>) otherMessageFromBinary(msg.getRemovals().toByteArray());
return new TwoPhaseSet(adds, removals);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
//#serializer

View file

@ -0,0 +1,32 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.ddata.japi.protobuf;
import docs.ddata.japi.TwoPhaseSet;
import akka.actor.ExtendedActorSystem;
public class TwoPhaseSetSerializerWithCompression extends TwoPhaseSetSerializer {
public TwoPhaseSetSerializerWithCompression(ExtendedActorSystem system) {
super(system);
}
//#compression
@Override
public byte[] toBinary(Object obj) {
if (obj instanceof TwoPhaseSet) {
return compress(twoPhaseSetToProto((TwoPhaseSet) obj));
} else {
throw new IllegalArgumentException(
"Can't serialize object of type " + obj.getClass());
}
}
@Override
public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
return twoPhaseSetFromBinary(decompress(bytes));
}
//#compression
}

View file

@ -372,7 +372,7 @@ Custom Data Type
---------------- ----------------
You can rather easily implement your own data types. The only requirement is that it implements You can rather easily implement your own data types. The only requirement is that it implements
the ``merge`` function of the ``AbstractReplicatedData`` class. the ``mergeData`` function of the ``AbstractReplicatedData`` class.
A nice property of stateful CRDTs is that they typically compose nicely, i.e. you can combine several A nice property of stateful CRDTs is that they typically compose nicely, i.e. you can combine several
smaller data types to build richer data structures. For example, the ``PNCounter`` is composed of smaller data types to build richer data structures. For example, the ``PNCounter`` is composed of
@ -382,9 +382,7 @@ Here is s simple implementation of a custom ``TwoPhaseSet`` that is using two in
to keep track of addition and removals. A ``TwoPhaseSet`` is a set where an element may be added and to keep track of addition and removals. A ``TwoPhaseSet`` is a set where an element may be added and
removed, but never added again thereafter. removed, but never added again thereafter.
**FIXME convert this example to Java** .. includecode:: code/docs/ddata/japi/TwoPhaseSet.java#twophaseset
.. includecode:: ../scala/code/docs/ddata/TwoPhaseSet.scala#twophaseset
Data types should be immutable, i.e. "modifying" methods should return a new instance. Data types should be immutable, i.e. "modifying" methods should return a new instance.
@ -407,23 +405,19 @@ This is a protobuf representation of the above ``TwoPhaseSet``:
The serializer for the ``TwoPhaseSet``: The serializer for the ``TwoPhaseSet``:
**FIXME convert this example to Java** .. includecode:: code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer.java#serializer
.. includecode:: ../scala/code/docs/ddata/protobuf/TwoPhaseSetSerializer.scala#serializer
Note that the elements of the sets are sorted so the SHA-1 digests are the same Note that the elements of the sets are sorted so the SHA-1 digests are the same
for the same elements. for the same elements.
You register the serializer in configuration: You register the serializer in configuration:
.. includecode:: ../scala/code/docs/ddata/DistributedDataDocSpec.scala#serializer-config .. includecode:: ../scala/code/docs/ddata/DistributedDataDocSpec.scala#japi-serializer-config
Using compression can sometimes be a good idea to reduce the data size. Gzip compression is Using compression can sometimes be a good idea to reduce the data size. Gzip compression is
provided by the ``akka.cluster.ddata.protobuf.SerializationSupport`` trait: provided by the ``akka.cluster.ddata.protobuf.SerializationSupport`` trait:
**FIXME convert this example to Java** .. includecode:: code/docs/ddata/japi/protobuf/TwoPhaseSetSerializerWithCompression.java#compression
.. includecode:: ../scala/code/docs/ddata/protobuf/TwoPhaseSetSerializer.scala#compression
The two embedded ``GSet`` can be serialized as illustrated above, but in general when composing The two embedded ``GSet`` can be serialized as illustrated above, but in general when composing
new data types from the existing built in types it is better to make use of the existing new data types from the existing built in types it is better to make use of the existing
@ -436,9 +430,7 @@ by the ``SerializationSupport`` trait to serialize and deserialize the ``GSet``
works with any type that has a registered Akka serializer. This is how such an serializer would works with any type that has a registered Akka serializer. This is how such an serializer would
look like for the ``TwoPhaseSet``: look like for the ``TwoPhaseSet``:
**FIXME convert this example to Java** .. includecode:: code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer2.java#serializer
.. includecode:: ../scala/code/docs/ddata/protobuf/TwoPhaseSetSerializer2.scala#serializer
CRDT Garbage CRDT Garbage

View file

@ -33,6 +33,17 @@ object DistributedDataDocSpec {
} }
} }
#//#serializer-config #//#serializer-config
#//#japi-serializer-config
akka.actor {
serializers {
twophaseset = "docs.ddata.japi.protobuf.TwoPhaseSetSerializer"
}
serialization-bindings {
"docs.ddata.japi.TwoPhaseSet" = twophaseset
}
}
#//#japi-serializer-config
""" """
//#data-bot //#data-bot
@ -391,4 +402,14 @@ class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) {
s1 should be(s1) s1 should be(s1)
} }
"test japi.TwoPhaseSetSerializer" in {
import scala.collection.JavaConverters._
val s1 = japi.TwoPhaseSet.create().add("a").add("b").add("c").remove("b")
s1.getElements.asScala should be(Set("a", "c"))
val serializer = SerializationExtension(system).findSerializerFor(s1)
val blob = serializer.toBinary(s1)
val s2 = serializer.fromBinary(blob, None)
s1 should be(s1)
}
} }

View file

@ -23,7 +23,7 @@ case class TwoPhaseSet(
override def merge(that: TwoPhaseSet): TwoPhaseSet = override def merge(that: TwoPhaseSet): TwoPhaseSet =
copy( copy(
adds = GSet(this.adds.elements ++ that.adds.elements), adds = this.adds.merge(that.adds),
removals = GSet(this.removals.elements ++ that.removals.elements)) removals = this.removals.merge(that.removals))
} }
//#twophaseset //#twophaseset