diff --git a/akka-cluster-tools/src/main/java/akka/cluster/client/protobuf/msg/ClusterClientMessages.java b/akka-cluster-tools/src/main/java/akka/cluster/client/protobuf/msg/ClusterClientMessages.java new file mode 100644 index 0000000000..ce8f5f9f3f --- /dev/null +++ b/akka-cluster-tools/src/main/java/akka/cluster/client/protobuf/msg/ClusterClientMessages.java @@ -0,0 +1,547 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: ClusterClientMessages.proto + +package akka.cluster.client.protobuf.msg; + +public final class ClusterClientMessages { + private ClusterClientMessages() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface ContactsOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // repeated string contactPoints = 1; + /** + * repeated string contactPoints = 1; + */ + java.util.List + getContactPointsList(); + /** + * repeated string contactPoints = 1; + */ + int getContactPointsCount(); + /** + * repeated string contactPoints = 1; + */ + java.lang.String getContactPoints(int index); + /** + * repeated string contactPoints = 1; + */ + com.google.protobuf.ByteString + getContactPointsBytes(int index); + } + /** + * Protobuf type {@code Contacts} + */ + public static final class Contacts extends + com.google.protobuf.GeneratedMessage + implements ContactsOrBuilder { + // Use Contacts.newBuilder() to construct. + private Contacts(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private Contacts(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final Contacts defaultInstance; + public static Contacts getDefaultInstance() { + return defaultInstance; + } + + public Contacts getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private Contacts( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) { + contactPoints_ = new com.google.protobuf.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000001; + } + contactPoints_.add(input.readBytes()); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) { + contactPoints_ = new com.google.protobuf.UnmodifiableLazyStringList(contactPoints_); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.class, akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public Contacts parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new Contacts(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + // repeated string contactPoints = 1; + public static final int CONTACTPOINTS_FIELD_NUMBER = 1; + private com.google.protobuf.LazyStringList contactPoints_; + /** + * repeated string contactPoints = 1; + */ + public java.util.List + getContactPointsList() { + return contactPoints_; + } + /** + * repeated string contactPoints = 1; + */ + public int getContactPointsCount() { + return contactPoints_.size(); + } + /** + * repeated string contactPoints = 1; + */ + public java.lang.String getContactPoints(int index) { + return contactPoints_.get(index); + } + /** + * repeated string contactPoints = 1; + */ + public com.google.protobuf.ByteString + getContactPointsBytes(int index) { + return contactPoints_.getByteString(index); + } + + private void initFields() { + contactPoints_ = com.google.protobuf.LazyStringArrayList.EMPTY; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + for (int i = 0; i < contactPoints_.size(); i++) { + output.writeBytes(1, contactPoints_.getByteString(i)); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + { + int dataSize = 0; + for (int i = 0; i < contactPoints_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeBytesSizeNoTag(contactPoints_.getByteString(i)); + } + size += dataSize; + size += 1 * getContactPointsList().size(); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code Contacts} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements akka.cluster.client.protobuf.msg.ClusterClientMessages.ContactsOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.class, akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.Builder.class); + } + + // Construct using akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + contactPoints_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_descriptor; + } + + public akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts getDefaultInstanceForType() { + return akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.getDefaultInstance(); + } + + public akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts build() { + akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts buildPartial() { + akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts result = new akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts(this); + int from_bitField0_ = bitField0_; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + contactPoints_ = new com.google.protobuf.UnmodifiableLazyStringList( + contactPoints_); + bitField0_ = (bitField0_ & ~0x00000001); + } + result.contactPoints_ = contactPoints_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts) { + return mergeFrom((akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts other) { + if (other == akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.getDefaultInstance()) return this; + if (!other.contactPoints_.isEmpty()) { + if (contactPoints_.isEmpty()) { + contactPoints_ = other.contactPoints_; + bitField0_ = (bitField0_ & ~0x00000001); + } else { + ensureContactPointsIsMutable(); + contactPoints_.addAll(other.contactPoints_); + } + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.cluster.client.protobuf.msg.ClusterClientMessages.Contacts) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // repeated string contactPoints = 1; + private com.google.protobuf.LazyStringList contactPoints_ = com.google.protobuf.LazyStringArrayList.EMPTY; + private void ensureContactPointsIsMutable() { + if (!((bitField0_ & 0x00000001) == 0x00000001)) { + contactPoints_ = new com.google.protobuf.LazyStringArrayList(contactPoints_); + bitField0_ |= 0x00000001; + } + } + /** + * repeated string contactPoints = 1; + */ + public java.util.List + getContactPointsList() { + return java.util.Collections.unmodifiableList(contactPoints_); + } + /** + * repeated string contactPoints = 1; + */ + public int getContactPointsCount() { + return contactPoints_.size(); + } + /** + * repeated string contactPoints = 1; + */ + public java.lang.String getContactPoints(int index) { + return contactPoints_.get(index); + } + /** + * repeated string contactPoints = 1; + */ + public com.google.protobuf.ByteString + getContactPointsBytes(int index) { + return contactPoints_.getByteString(index); + } + /** + * repeated string contactPoints = 1; + */ + public Builder setContactPoints( + int index, java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureContactPointsIsMutable(); + contactPoints_.set(index, value); + onChanged(); + return this; + } + /** + * repeated string contactPoints = 1; + */ + public Builder addContactPoints( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureContactPointsIsMutable(); + contactPoints_.add(value); + onChanged(); + return this; + } + /** + * repeated string contactPoints = 1; + */ + public Builder addAllContactPoints( + java.lang.Iterable values) { + ensureContactPointsIsMutable(); + super.addAll(values, contactPoints_); + onChanged(); + return this; + } + /** + * repeated string contactPoints = 1; + */ + public Builder clearContactPoints() { + contactPoints_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + onChanged(); + return this; + } + /** + * repeated string contactPoints = 1; + */ + public Builder addContactPointsBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureContactPointsIsMutable(); + contactPoints_.add(value); + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:Contacts) + } + + static { + defaultInstance = new Contacts(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:Contacts) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_Contacts_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_Contacts_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\033ClusterClientMessages.proto\"!\n\010Contact" + + "s\022\025\n\rcontactPoints\030\001 \003(\tB$\n akka.cluster" + + ".client.protobuf.msgH\001" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_Contacts_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_Contacts_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_Contacts_descriptor, + new java.lang.String[] { "ContactPoints", }); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/akka-cluster-tools/src/main/protobuf/ClusterClientMessages.proto b/akka-cluster-tools/src/main/protobuf/ClusterClientMessages.proto new file mode 100644 index 0000000000..2238adfb54 --- /dev/null +++ b/akka-cluster-tools/src/main/protobuf/ClusterClientMessages.proto @@ -0,0 +1,11 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +option java_package = "akka.cluster.client.protobuf.msg"; +option optimize_for = SPEED; + +message Contacts { + repeated string contactPoints = 1; +} + diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index a933cd5837..2b2c615182 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -114,6 +114,19 @@ akka.cluster.client { } # //#cluster-client-config +# Protobuf serializer for ClusterClient messages +akka.actor { + serializers { + akka-cluster-client = "akka.cluster.client.protobuf.ClusterClientMessageSerializer" + } + serialization-bindings { + "akka.cluster.client.ClusterClientMessage" = akka-cluster-client + } + serialization-identifiers { + "akka.cluster.client.protobuf.ClusterClientMessageSerializer" = 15 + } +} + # //#singleton-config akka.cluster.singleton { # The actor name of the child singleton actor. diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala index abc68a9256..c90f75b4e9 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala @@ -259,7 +259,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac def establishing: Actor.Receive = { case Contacts(contactPoints) ⇒ if (contactPoints.nonEmpty) { - contacts = contactPoints + contacts = contactPoints.map(context.actorSelection) contacts foreach { _ ! Identify(None) } } case ActorIdentity(_, Some(receptionist)) ⇒ @@ -303,7 +303,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac case Contacts(contactPoints) ⇒ // refresh of contacts if (contactPoints.nonEmpty) - contacts = contactPoints + contacts = contactPoints.map(context.actorSelection) case _: ActorIdentity ⇒ // ok, from previous establish, already handled } @@ -490,6 +490,11 @@ final class ClusterReceptionistSettings( } +/** + * Marker trait for remote messages with special serializer. + */ +sealed trait ClusterClientMessage extends Serializable + object ClusterReceptionist { /** @@ -505,13 +510,13 @@ object ClusterReceptionist { */ private[akka] object Internal { @SerialVersionUID(1L) - case object GetContacts extends DeadLetterSuppression + case object GetContacts extends ClusterClientMessage with DeadLetterSuppression @SerialVersionUID(1L) - final case class Contacts(contactPoints: immutable.IndexedSeq[ActorSelection]) + final case class Contacts(contactPoints: immutable.IndexedSeq[String]) extends ClusterClientMessage @SerialVersionUID(1L) - case object Heartbeat extends DeadLetterSuppression + case object Heartbeat extends ClusterClientMessage with DeadLetterSuppression @SerialVersionUID(1L) - case object HeartbeatRsp extends DeadLetterSuppression + case object HeartbeatRsp extends ClusterClientMessage with DeadLetterSuppression @SerialVersionUID(1L) case object Ping extends DeadLetterSuppression @@ -519,12 +524,14 @@ object ClusterReceptionist { * Replies are tunneled via this actor, child of the receptionist, to avoid * inbound connections from other cluster nodes to the client. */ - class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor { + class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging { context.setReceiveTimeout(timeout) def receive = { - case Ping ⇒ // keep alive from client - case ReceiveTimeout ⇒ context stop self - case msg ⇒ client.tell(msg, Actor.noSender) + case Ping ⇒ // keep alive from client + case ReceiveTimeout ⇒ + log.debug("ClientResponseTunnel for client [{}] stopped due to inactivity", client.path) + context stop self + case msg ⇒ client.tell(msg, Actor.noSender) } } } @@ -613,6 +620,7 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep pubSubMediator.tell(msg, tunnel) case Heartbeat ⇒ + log.debug("Heartbeat from client [{}]", sender().path) sender() ! HeartbeatRsp case GetContacts ⇒ @@ -620,7 +628,10 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep // is the same from all nodes (most of the time) and it also // load balances the client connections among the nodes in the cluster. if (numberOfContacts >= nodes.size) { - sender() ! Contacts(nodes.map(a ⇒ context.actorSelection(self.path.toStringWithAddress(a)))(collection.breakOut)) + val contacts = Contacts(nodes.map(a ⇒ self.path.toStringWithAddress(a))(collection.breakOut)) + if (log.isDebugEnabled) + log.debug("Client [{}] gets contactPoints [{}] (all nodes)", sender().path, contacts.contactPoints.mkString(",")) + sender() ! contacts } else { // using toStringWithAddress in case the client is local, normally it is not, and // toStringWithAddress will use the remote address of the client @@ -630,7 +641,10 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep if (first.size == numberOfContacts) first else first ++ nodes.take(numberOfContacts - first.size) } - sender() ! Contacts(slice.map(a ⇒ context.actorSelection(self.path.toStringWithAddress(a)))(collection.breakOut)) + val contacts = Contacts(slice.map(a ⇒ self.path.toStringWithAddress(a))(collection.breakOut)) + if (log.isDebugEnabled) + log.debug("Client [{}] gets contactPoints [{}]", sender().path, contacts.contactPoints.mkString(",")) + sender() ! contacts } case state: CurrentClusterState ⇒ diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializer.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializer.scala new file mode 100644 index 0000000000..62b22ebb3c --- /dev/null +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializer.scala @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.cluster.client.protobuf + +import scala.collection.JavaConverters._ +import akka.actor.ExtendedActorSystem +import akka.serialization.BaseSerializer +import akka.serialization.SerializationExtension +import akka.serialization.SerializerWithStringManifest +import akka.cluster.client.ClusterReceptionist +import akka.cluster.client.protobuf.msg.{ ClusterClientMessages ⇒ cm } + +/** + * INTERNAL API: Serializer of ClusterClient messages. + */ +private[akka] class ClusterClientMessageSerializer(val system: ExtendedActorSystem) + extends SerializerWithStringManifest with BaseSerializer { + import ClusterReceptionist.Internal._ + + private lazy val serialization = SerializationExtension(system) + + private val ContactsManifest = "A" + private val GetContactsManifest = "B" + private val HeartbeatManifest = "C" + private val HeartbeatRspManifest = "D" + + private val emptyByteArray = Array.empty[Byte] + + private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef]( + ContactsManifest -> contactsFromBinary, + GetContactsManifest -> { _ ⇒ GetContacts }, + HeartbeatManifest -> { _ ⇒ Heartbeat }, + HeartbeatRspManifest -> { _ ⇒ HeartbeatRsp }) + + override def manifest(obj: AnyRef): String = obj match { + case _: Contacts ⇒ ContactsManifest + case GetContacts ⇒ GetContactsManifest + case Heartbeat ⇒ HeartbeatManifest + case HeartbeatRsp ⇒ HeartbeatRspManifest + case _ ⇒ + throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") + } + + override def toBinary(obj: AnyRef): Array[Byte] = obj match { + case m: Contacts ⇒ contactsToProto(m).toByteArray + case GetContacts ⇒ emptyByteArray + case Heartbeat ⇒ emptyByteArray + case HeartbeatRsp ⇒ emptyByteArray + case _ ⇒ + throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") + } + + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = + fromBinaryMap.get(manifest) match { + case Some(f) ⇒ f(bytes) + case None ⇒ throw new IllegalArgumentException( + s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") + } + + private def contactsToProto(m: Contacts): cm.Contacts = + cm.Contacts.newBuilder().addAllContactPoints(m.contactPoints.asJava).build() + + private def contactsFromBinary(bytes: Array[Byte]): Contacts = { + val m = cm.Contacts.parseFrom(bytes) + Contacts(m.getContactPointsList.asScala.toVector) + } + +} diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializerSpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializerSpec.scala new file mode 100644 index 0000000000..f05185d439 --- /dev/null +++ b/akka-cluster-tools/src/test/scala/akka/cluster/client/protobuf/ClusterClientMessageSerializerSpec.scala @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.cluster.client.protobuf + +import akka.actor.ExtendedActorSystem +import akka.testkit.AkkaSpec +import akka.cluster.client.ClusterReceptionist.Internal._ + +class ClusterClientMessageSerializerSpec extends AkkaSpec { + + val serializer = new ClusterClientMessageSerializer(system.asInstanceOf[ExtendedActorSystem]) + + def checkSerialization(obj: AnyRef): Unit = { + val blob = serializer.toBinary(obj) + val ref = serializer.fromBinary(blob, serializer.manifest(obj)) + ref should ===(obj) + } + + "ClusterClientMessages" must { + + "be serializable" in { + val contactPoints = Vector( + "akka.tcp://system@node-1:2552/system/receptionist", + "akka.tcp://system@node-2:2552/system/receptionist", + "akka.tcp://system@node-3:2552/system/receptionist") + checkSerialization(Contacts(contactPoints)) + checkSerialization(GetContacts) + checkSerialization(Heartbeat) + checkSerialization(HeartbeatRsp) + } + } +}