Merge pull request #18286 from akka/wip-18258-cluster-client-protobuf-patriknw

=clt #18258 Create protobuf serialization for ClusterClient
This commit is contained in:
Patrik Nordwall 2015-08-21 14:38:10 +02:00
commit 6420a37250
6 changed files with 699 additions and 12 deletions

View file

@ -0,0 +1,547 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: ClusterClientMessages.proto
package akka.cluster.client.protobuf.msg;
public final class ClusterClientMessages {
private ClusterClientMessages() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
public interface ContactsOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// repeated string contactPoints = 1;
/**
* <code>repeated string contactPoints = 1;</code>
*/
java.util.List<java.lang.String>
getContactPointsList();
/**
* <code>repeated string contactPoints = 1;</code>
*/
int getContactPointsCount();
/**
* <code>repeated string contactPoints = 1;</code>
*/
java.lang.String getContactPoints(int index);
/**
* <code>repeated string contactPoints = 1;</code>
*/
com.google.protobuf.ByteString
getContactPointsBytes(int index);
}
/**
* Protobuf type {@code Contacts}
*/
public static final class Contacts extends
com.google.protobuf.GeneratedMessage
implements ContactsOrBuilder {
// Use Contacts.newBuilder() to construct.
private Contacts(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
super(builder);
this.unknownFields = builder.getUnknownFields();
}
private Contacts(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
private static final Contacts defaultInstance;
public static Contacts getDefaultInstance() {
return defaultInstance;
}
public Contacts getDefaultInstanceForType() {
return defaultInstance;
}
private final com.google.protobuf.UnknownFieldSet unknownFields;
@java.lang.Override
public final com.google.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private Contacts(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
initFields();
int mutable_bitField0_ = 0;
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
done = true;
}
break;
}
case 10: {
if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
contactPoints_ = new com.google.protobuf.LazyStringArrayList();
mutable_bitField0_ |= 0x00000001;
}
contactPoints_.add(input.readBytes());
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new com.google.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
contactPoints_ = new com.google.protobuf.UnmodifiableLazyStringList(contactPoints_);
}
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.class, akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.Builder.class);
}
public static com.google.protobuf.Parser<Contacts> PARSER =
new com.google.protobuf.AbstractParser<Contacts>() {
public Contacts parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return new Contacts(input, extensionRegistry);
}
};
@java.lang.Override
public com.google.protobuf.Parser<Contacts> getParserForType() {
return PARSER;
}
// repeated string contactPoints = 1;
public static final int CONTACTPOINTS_FIELD_NUMBER = 1;
private com.google.protobuf.LazyStringList contactPoints_;
/**
* <code>repeated string contactPoints = 1;</code>
*/
public java.util.List<java.lang.String>
getContactPointsList() {
return contactPoints_;
}
/**
* <code>repeated string contactPoints = 1;</code>
*/
public int getContactPointsCount() {
return contactPoints_.size();
}
/**
* <code>repeated string contactPoints = 1;</code>
*/
public java.lang.String getContactPoints(int index) {
return contactPoints_.get(index);
}
/**
* <code>repeated string contactPoints = 1;</code>
*/
public com.google.protobuf.ByteString
getContactPointsBytes(int index) {
return contactPoints_.getByteString(index);
}
private void initFields() {
contactPoints_ = com.google.protobuf.LazyStringArrayList.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
for (int i = 0; i < contactPoints_.size(); i++) {
output.writeBytes(1, contactPoints_.getByteString(i));
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
{
int dataSize = 0;
for (int i = 0; i < contactPoints_.size(); i++) {
dataSize += com.google.protobuf.CodedOutputStream
.computeBytesSizeNoTag(contactPoints_.getByteString(i));
}
size += dataSize;
size += 1 * getContactPointsList().size();
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code Contacts}
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements akka.cluster.client.protobuf.msg.ClusterClientMessages.ContactsOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.class, akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.Builder.class);
}
// Construct using akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
contactPoints_ = com.google.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return akka.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_descriptor;
}
public akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts getDefaultInstanceForType() {
return akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.getDefaultInstance();
}
public akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts build() {
akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts buildPartial() {
akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts result = new akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts(this);
int from_bitField0_ = bitField0_;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
contactPoints_ = new com.google.protobuf.UnmodifiableLazyStringList(
contactPoints_);
bitField0_ = (bitField0_ & ~0x00000001);
}
result.contactPoints_ = contactPoints_;
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts) {
return mergeFrom((akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts other) {
if (other == akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.getDefaultInstance()) return this;
if (!other.contactPoints_.isEmpty()) {
if (contactPoints_.isEmpty()) {
contactPoints_ = other.contactPoints_;
bitField0_ = (bitField0_ & ~0x00000001);
} else {
ensureContactPointsIsMutable();
contactPoints_.addAll(other.contactPoints_);
}
onChanged();
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
// repeated string contactPoints = 1;
private com.google.protobuf.LazyStringList contactPoints_ = com.google.protobuf.LazyStringArrayList.EMPTY;
private void ensureContactPointsIsMutable() {
if (!((bitField0_ & 0x00000001) == 0x00000001)) {
contactPoints_ = new com.google.protobuf.LazyStringArrayList(contactPoints_);
bitField0_ |= 0x00000001;
}
}
/**
* <code>repeated string contactPoints = 1;</code>
*/
public java.util.List<java.lang.String>
getContactPointsList() {
return java.util.Collections.unmodifiableList(contactPoints_);
}
/**
* <code>repeated string contactPoints = 1;</code>
*/
public int getContactPointsCount() {
return contactPoints_.size();
}
/**
* <code>repeated string contactPoints = 1;</code>
*/
public java.lang.String getContactPoints(int index) {
return contactPoints_.get(index);
}
/**
* <code>repeated string contactPoints = 1;</code>
*/
public com.google.protobuf.ByteString
getContactPointsBytes(int index) {
return contactPoints_.getByteString(index);
}
/**
* <code>repeated string contactPoints = 1;</code>
*/
public Builder setContactPoints(
int index, java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureContactPointsIsMutable();
contactPoints_.set(index, value);
onChanged();
return this;
}
/**
* <code>repeated string contactPoints = 1;</code>
*/
public Builder addContactPoints(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureContactPointsIsMutable();
contactPoints_.add(value);
onChanged();
return this;
}
/**
* <code>repeated string contactPoints = 1;</code>
*/
public Builder addAllContactPoints(
java.lang.Iterable<java.lang.String> values) {
ensureContactPointsIsMutable();
super.addAll(values, contactPoints_);
onChanged();
return this;
}
/**
* <code>repeated string contactPoints = 1;</code>
*/
public Builder clearContactPoints() {
contactPoints_ = com.google.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000001);
onChanged();
return this;
}
/**
* <code>repeated string contactPoints = 1;</code>
*/
public Builder addContactPointsBytes(
com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
ensureContactPointsIsMutable();
contactPoints_.add(value);
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:Contacts)
}
static {
defaultInstance = new Contacts(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:Contacts)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_Contacts_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_Contacts_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\033ClusterClientMessages.proto\"!\n\010Contact" +
"s\022\025\n\rcontactPoints\030\001 \003(\tB$\n akka.cluster" +
".client.protobuf.msgH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_Contacts_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_Contacts_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Contacts_descriptor,
new java.lang.String[] { "ContactPoints", });
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
}, assigner);
}
// @@protoc_insertion_point(outer_class_scope)
}

View file

@ -0,0 +1,11 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
option java_package = "akka.cluster.client.protobuf.msg";
option optimize_for = SPEED;
message Contacts {
repeated string contactPoints = 1;
}

View file

@ -114,6 +114,19 @@ akka.cluster.client {
}
# //#cluster-client-config
# Protobuf serializer for ClusterClient messages
akka.actor {
serializers {
akka-cluster-client = "akka.cluster.client.protobuf.ClusterClientMessageSerializer"
}
serialization-bindings {
"akka.cluster.client.ClusterClientMessage" = akka-cluster-client
}
serialization-identifiers {
"akka.cluster.client.protobuf.ClusterClientMessageSerializer" = 15
}
}
# //#singleton-config
akka.cluster.singleton {
# The actor name of the child singleton actor.

View file

@ -259,7 +259,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac
def establishing: Actor.Receive = {
case Contacts(contactPoints)
if (contactPoints.nonEmpty) {
contacts = contactPoints
contacts = contactPoints.map(context.actorSelection)
contacts foreach { _ ! Identify(None) }
}
case ActorIdentity(_, Some(receptionist))
@ -303,7 +303,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac
case Contacts(contactPoints)
// refresh of contacts
if (contactPoints.nonEmpty)
contacts = contactPoints
contacts = contactPoints.map(context.actorSelection)
case _: ActorIdentity // ok, from previous establish, already handled
}
@ -490,6 +490,11 @@ final class ClusterReceptionistSettings(
}
/**
* Marker trait for remote messages with special serializer.
*/
sealed trait ClusterClientMessage extends Serializable
object ClusterReceptionist {
/**
@ -505,13 +510,13 @@ object ClusterReceptionist {
*/
private[akka] object Internal {
@SerialVersionUID(1L)
case object GetContacts extends DeadLetterSuppression
case object GetContacts extends ClusterClientMessage with DeadLetterSuppression
@SerialVersionUID(1L)
final case class Contacts(contactPoints: immutable.IndexedSeq[ActorSelection])
final case class Contacts(contactPoints: immutable.IndexedSeq[String]) extends ClusterClientMessage
@SerialVersionUID(1L)
case object Heartbeat extends DeadLetterSuppression
case object Heartbeat extends ClusterClientMessage with DeadLetterSuppression
@SerialVersionUID(1L)
case object HeartbeatRsp extends DeadLetterSuppression
case object HeartbeatRsp extends ClusterClientMessage with DeadLetterSuppression
@SerialVersionUID(1L)
case object Ping extends DeadLetterSuppression
@ -519,12 +524,14 @@ object ClusterReceptionist {
* Replies are tunneled via this actor, child of the receptionist, to avoid
* inbound connections from other cluster nodes to the client.
*/
class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor {
class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging {
context.setReceiveTimeout(timeout)
def receive = {
case Ping // keep alive from client
case ReceiveTimeout context stop self
case msg client.tell(msg, Actor.noSender)
case Ping // keep alive from client
case ReceiveTimeout
log.debug("ClientResponseTunnel for client [{}] stopped due to inactivity", client.path)
context stop self
case msg client.tell(msg, Actor.noSender)
}
}
}
@ -613,6 +620,7 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep
pubSubMediator.tell(msg, tunnel)
case Heartbeat
log.debug("Heartbeat from client [{}]", sender().path)
sender() ! HeartbeatRsp
case GetContacts
@ -620,7 +628,10 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep
// is the same from all nodes (most of the time) and it also
// load balances the client connections among the nodes in the cluster.
if (numberOfContacts >= nodes.size) {
sender() ! Contacts(nodes.map(a context.actorSelection(self.path.toStringWithAddress(a)))(collection.breakOut))
val contacts = Contacts(nodes.map(a self.path.toStringWithAddress(a))(collection.breakOut))
if (log.isDebugEnabled)
log.debug("Client [{}] gets contactPoints [{}] (all nodes)", sender().path, contacts.contactPoints.mkString(","))
sender() ! contacts
} else {
// using toStringWithAddress in case the client is local, normally it is not, and
// toStringWithAddress will use the remote address of the client
@ -630,7 +641,10 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep
if (first.size == numberOfContacts) first
else first ++ nodes.take(numberOfContacts - first.size)
}
sender() ! Contacts(slice.map(a context.actorSelection(self.path.toStringWithAddress(a)))(collection.breakOut))
val contacts = Contacts(slice.map(a self.path.toStringWithAddress(a))(collection.breakOut))
if (log.isDebugEnabled)
log.debug("Client [{}] gets contactPoints [{}]", sender().path, contacts.contactPoints.mkString(","))
sender() ! contacts
}
case state: CurrentClusterState

View file

@ -0,0 +1,69 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.client.protobuf
import scala.collection.JavaConverters._
import akka.actor.ExtendedActorSystem
import akka.serialization.BaseSerializer
import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest
import akka.cluster.client.ClusterReceptionist
import akka.cluster.client.protobuf.msg.{ ClusterClientMessages cm }
/**
* INTERNAL API: Serializer of ClusterClient messages.
*/
private[akka] class ClusterClientMessageSerializer(val system: ExtendedActorSystem)
extends SerializerWithStringManifest with BaseSerializer {
import ClusterReceptionist.Internal._
private lazy val serialization = SerializationExtension(system)
private val ContactsManifest = "A"
private val GetContactsManifest = "B"
private val HeartbeatManifest = "C"
private val HeartbeatRspManifest = "D"
private val emptyByteArray = Array.empty[Byte]
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] AnyRef](
ContactsManifest -> contactsFromBinary,
GetContactsManifest -> { _ GetContacts },
HeartbeatManifest -> { _ Heartbeat },
HeartbeatRspManifest -> { _ HeartbeatRsp })
override def manifest(obj: AnyRef): String = obj match {
case _: Contacts ContactsManifest
case GetContacts GetContactsManifest
case Heartbeat HeartbeatManifest
case HeartbeatRsp HeartbeatRspManifest
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
override def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: Contacts contactsToProto(m).toByteArray
case GetContacts emptyByteArray
case Heartbeat emptyByteArray
case HeartbeatRsp emptyByteArray
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
fromBinaryMap.get(manifest) match {
case Some(f) f(bytes)
case None throw new IllegalArgumentException(
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
}
private def contactsToProto(m: Contacts): cm.Contacts =
cm.Contacts.newBuilder().addAllContactPoints(m.contactPoints.asJava).build()
private def contactsFromBinary(bytes: Array[Byte]): Contacts = {
val m = cm.Contacts.parseFrom(bytes)
Contacts(m.getContactPointsList.asScala.toVector)
}
}

View file

@ -0,0 +1,33 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.client.protobuf
import akka.actor.ExtendedActorSystem
import akka.testkit.AkkaSpec
import akka.cluster.client.ClusterReceptionist.Internal._
class ClusterClientMessageSerializerSpec extends AkkaSpec {
val serializer = new ClusterClientMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
def checkSerialization(obj: AnyRef): Unit = {
val blob = serializer.toBinary(obj)
val ref = serializer.fromBinary(blob, serializer.manifest(obj))
ref should ===(obj)
}
"ClusterClientMessages" must {
"be serializable" in {
val contactPoints = Vector(
"akka.tcp://system@node-1:2552/system/receptionist",
"akka.tcp://system@node-2:2552/system/receptionist",
"akka.tcp://system@node-3:2552/system/receptionist")
checkSerialization(Contacts(contactPoints))
checkSerialization(GetContacts)
checkSerialization(Heartbeat)
checkSerialization(HeartbeatRsp)
}
}
}