add serializer for typed ShardingEnvelope, #23772

This commit is contained in:
Patrik Nordwall 2017-10-03 14:56:23 +02:00
parent af411d2f8d
commit 985afe8020
10 changed files with 998 additions and 28 deletions

View file

@ -6,8 +6,8 @@ package akka.typed.cluster.internal
import akka.serialization.{ SerializationExtension, SerializerWithStringManifest }
import akka.typed.{ ActorRef, TypedSpec }
import akka.typed.TypedSpec.Create
import akka.typed.internal.adapter.ActorSystemAdapter
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.adapter._
import akka.typed.scaladsl.AskPattern._
import com.typesafe.config.ConfigFactory
@ -28,23 +28,22 @@ class MiscMessageSerializerSpec extends TypedSpec(MiscMessageSerializerSpec.conf
object `The typed MiscMessageSerializer` {
def `must serialize and deserialize typed actor refs `(): Unit = {
val serialization = SerializationExtension(system.toUntyped)
val ref = (system ? Create(Actor.empty[Unit], "some-actor")).futureValue
val serialization = SerializationExtension(ActorSystemAdapter.toUntyped(system))
val serializer = serialization.findSerializerFor(ref) match {
case s: SerializerWithStringManifest s
def checkSerialization(obj: AnyRef): Unit = {
serialization.findSerializerFor(obj) match {
case serializer: MiscMessageSerializer
val blob = serializer.toBinary(obj)
val ref = serializer.fromBinary(blob, serializer.manifest(obj))
ref should ===(obj)
case s
throw new IllegalStateException(s"Wrong serializer ${s.getClass} for ${obj.getClass}")
}
}
val manifest = serializer.manifest(ref)
val serialized = serializer.toBinary(ref)
val result = serializer.fromBinary(serialized, manifest)
result should ===(ref)
def `must serialize and deserialize typed actor refs `(): Unit = {
val ref = (system ? Create(Actor.empty[Unit], "some-actor")).futureValue
checkSerialization(ref)
}
}

View file

@ -16,10 +16,18 @@ import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.typed.cluster.Join
import org.scalatest.concurrent.Eventually
import akka.cluster.MemberStatus
import akka.actor.ExtendedActorSystem
import akka.serialization.SerializerWithStringManifest
import akka.typed.cluster.ActorRefResolver
import java.nio.charset.StandardCharsets
object ClusterShardingSpec {
val config = ConfigFactory.parseString(
"""
s"""
akka.actor.provider = cluster
// akka.loglevel = debug
@ -36,10 +44,18 @@ object ClusterShardingSpec {
akka.actor {
serialize-messages = off
allow-java-serialization = off
serializers {
test = "akka.typed.cluster.sharding.ClusterShardingSpec$$Serializer"
}
serialization-bindings {
"akka.typed.cluster.sharding.ClusterShardingSpec$$TestProtocol" = test
"akka.typed.cluster.sharding.ClusterShardingSpec$$IdTestProtocol" = test
}
}
""".stripMargin)
sealed trait TestProtocol
sealed trait TestProtocol extends java.io.Serializable
final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol
final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol
final case class StopPlz() extends TestProtocol
@ -49,9 +65,59 @@ object ClusterShardingSpec {
final case class IdWhoAreYou(id: String, replyTo: ActorRef[String]) extends IdTestProtocol
final case class IdStopPlz(id: String) extends IdTestProtocol
class Serializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
def identifier: Int = 48
def manifest(o: AnyRef): String = o match {
case _: ReplyPlz "a"
case _: WhoAreYou "b"
case _: StopPlz "c"
case _: IdReplyPlz "A"
case _: IdWhoAreYou "B"
case _: IdStopPlz "C"
}
private def actorRefToBinary(ref: ActorRef[_]): Array[Byte] =
ActorRefResolver(system.toTyped).toSerializationFormat(ref).getBytes(StandardCharsets.UTF_8)
private def idAndRefToBinary(id: String, ref: ActorRef[_]): Array[Byte] = {
val idBytes = id.getBytes(StandardCharsets.UTF_8)
val refBytes = actorRefToBinary(ref)
// yeah, very ad-hoc ;-)
Array(idBytes.length.toByte) ++ idBytes ++ refBytes
}
def toBinary(o: AnyRef): Array[Byte] = o match {
case ReplyPlz(ref) actorRefToBinary(ref)
case WhoAreYou(ref) actorRefToBinary(ref)
case _: StopPlz Array.emptyByteArray
case IdReplyPlz(id, ref) idAndRefToBinary(id, ref)
case IdWhoAreYou(id, ref) idAndRefToBinary(id, ref)
case IdStopPlz(id) id.getBytes(StandardCharsets.UTF_8)
}
private def actorRefFromBinary[T](bytes: Array[Byte]): ActorRef[T] =
ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8))
private def idAndRefFromBinary[T](bytes: Array[Byte]): (String, ActorRef[T]) = {
val idLength = bytes(0)
val id = new String(bytes.slice(1, idLength), StandardCharsets.UTF_8)
val ref = actorRefFromBinary(bytes.drop(1 + idLength))
(id, ref)
}
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case "a" ReplyPlz(actorRefFromBinary(bytes))
case "b" WhoAreYou(actorRefFromBinary(bytes))
case "c" StopPlz()
case "A" IdReplyPlz.tupled(idAndRefFromBinary(bytes))
case "B" IdWhoAreYou.tupled(idAndRefFromBinary(bytes))
case "C" IdStopPlz(new String(bytes, StandardCharsets.UTF_8))
}
}
}
class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with ScalaFutures {
class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with ScalaFutures with Eventually {
import akka.typed.scaladsl.adapter._
import ClusterShardingSpec._
@ -60,7 +126,15 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
val sharding = ClusterSharding(system)
implicit val untypedSystem = system.toUntyped
private val untypedCluster = akka.cluster.Cluster(untypedSystem)
val untypedSystem2 = akka.actor.ActorSystem(system.name, system.settings.config)
val system2 = untypedSystem2.toTyped
val sharding2 = ClusterSharding(system2)
override def afterAll(): Unit = {
Await.result(system2.terminate, timeout.duration)
super.afterAll()
}
val typeKey = EntityTypeKey[TestProtocol]("envelope-shard")
val behavior = Actor.immutable[TestProtocol] {
@ -92,9 +166,22 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
object `Typed cluster sharding` {
untypedCluster.join(untypedCluster.selfAddress)
def `01 must join cluster`(): Unit = {
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
Cluster(system2).manager ! Join(Cluster(system).selfMember.address)
def `01 must send messsages via cluster sharding, using envelopes`(): Unit = {
eventually {
Cluster(system).state.members.map(_.status) should ===(Set(MemberStatus.Up))
Cluster(system).state.members.size should ===(2)
}
eventually {
Cluster(system2).state.members.map(_.status) should ===(Set(MemberStatus.Up))
Cluster(system2).state.members.size should ===(2)
}
}
def `02 must send messsages via cluster sharding, using envelopes`(): Unit = {
val ref = sharding.spawn(
behavior,
Props.empty,
@ -102,6 +189,13 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
ClusterShardingSettings(system),
10,
StopPlz())
sharding2.spawn(
behavior,
Props.empty,
typeKey,
ClusterShardingSettings(system2),
10,
StopPlz())
val p = TestProbe[String]()
ref ! ShardingEnvelope("test", ReplyPlz(p.ref))
@ -109,7 +203,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
ref ! ShardingEnvelope("test", StopPlz())
}
def `02 must send messsages via cluster sharding, without envelopes`(): Unit = {
def `03 must send messsages via cluster sharding, without envelopes`(): Unit = {
val ref = sharding.spawn(
behaviorWithId,
Props.empty,
@ -117,6 +211,13 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
ClusterShardingSettings(system),
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, _.id),
IdStopPlz("THE_ID_HERE"))
sharding2.spawn(
behaviorWithId,
Props.empty,
typeKey2,
ClusterShardingSettings(system2),
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, _.id),
IdStopPlz("THE_ID_HERE"))
val p = TestProbe[String]()
ref ! IdReplyPlz("test", p.ref)
@ -125,7 +226,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
ref ! IdStopPlz("test")
}
// def `03 fail if starting sharding for already used typeName, but with wrong type`(): Unit = {
// def `04 fail if starting sharding for already used typeName, but with wrong type`(): Unit = {
// val ex = intercept[Exception] {
// sharding.spawn(
// Actor.empty[String],
@ -140,8 +241,6 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
// ex.getMessage should include("already started")
// }
untypedCluster.join(untypedCluster.selfAddress)
def `11 EntityRef - tell`(): Unit = {
val charlieRef = sharding.entityRefFor(typeKey, "charlie")

View file

@ -0,0 +1,38 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.sharding
import akka.serialization.SerializationExtension
import akka.typed.TypedSpec
import akka.typed.cluster.sharding.internal.ShardingSerializer
import akka.typed.internal.adapter.ActorSystemAdapter
import akka.typed.scaladsl.AskPattern._
class ShardingSerializerSpec extends TypedSpec {
object `The typed ShardingSerializer` {
val serialization = SerializationExtension(ActorSystemAdapter.toUntyped(system))
def checkSerialization(obj: AnyRef): Unit = {
serialization.findSerializerFor(obj) match {
case serializer: ShardingSerializer
val blob = serializer.toBinary(obj)
val ref = serializer.fromBinary(blob, serializer.manifest(obj))
ref should ===(obj)
case s
throw new IllegalStateException(s"Wrong serializer ${s.getClass} for ${obj.getClass}")
}
}
def `must serialize and deserialize ShardingEnvelope`(): Unit = {
checkSerialization(ShardingEnvelope("abc", 42))
}
def `must serialize and deserialize StartEntity`(): Unit = {
checkSerialization(StartEntity("abc"))
}
}
}

View file

@ -0,0 +1,736 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: ShardingMessages.proto
package akka.typed.cluster.sharding.internal.protobuf;
public final class ShardingMessages {
private ShardingMessages() {}
public static void registerAllExtensions(
akka.protobuf.ExtensionRegistry registry) {
}
public interface ShardingEnvelopeOrBuilder
extends akka.protobuf.MessageOrBuilder {
// required string entityId = 1;
/**
* <code>required string entityId = 1;</code>
*/
boolean hasEntityId();
/**
* <code>required string entityId = 1;</code>
*/
java.lang.String getEntityId();
/**
* <code>required string entityId = 1;</code>
*/
akka.protobuf.ByteString
getEntityIdBytes();
// optional .Payload message = 2;
/**
* <code>optional .Payload message = 2;</code>
*/
boolean hasMessage();
/**
* <code>optional .Payload message = 2;</code>
*/
akka.remote.ContainerFormats.Payload getMessage();
/**
* <code>optional .Payload message = 2;</code>
*/
akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder();
}
/**
* Protobuf type {@code akka.typed.cluster.sharding.ShardingEnvelope}
*/
public static final class ShardingEnvelope extends
akka.protobuf.GeneratedMessage
implements ShardingEnvelopeOrBuilder {
// Use ShardingEnvelope.newBuilder() to construct.
private ShardingEnvelope(akka.protobuf.GeneratedMessage.Builder<?> builder) {
super(builder);
this.unknownFields = builder.getUnknownFields();
}
private ShardingEnvelope(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); }
private static final ShardingEnvelope defaultInstance;
public static ShardingEnvelope getDefaultInstance() {
return defaultInstance;
}
public ShardingEnvelope getDefaultInstanceForType() {
return defaultInstance;
}
private final akka.protobuf.UnknownFieldSet unknownFields;
@java.lang.Override
public final akka.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private ShardingEnvelope(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
initFields();
int mutable_bitField0_ = 0;
akka.protobuf.UnknownFieldSet.Builder unknownFields =
akka.protobuf.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
done = true;
}
break;
}
case 10: {
bitField0_ |= 0x00000001;
entityId_ = input.readBytes();
break;
}
case 18: {
akka.remote.ContainerFormats.Payload.Builder subBuilder = null;
if (((bitField0_ & 0x00000002) == 0x00000002)) {
subBuilder = message_.toBuilder();
}
message_ = input.readMessage(akka.remote.ContainerFormats.Payload.PARSER, extensionRegistry);
if (subBuilder != null) {
subBuilder.mergeFrom(message_);
message_ = subBuilder.buildPartial();
}
bitField0_ |= 0x00000002;
break;
}
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new akka.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final akka.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.internal_static_akka_typed_cluster_sharding_ShardingEnvelope_descriptor;
}
protected akka.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.internal_static_akka_typed_cluster_sharding_ShardingEnvelope_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.class, akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.Builder.class);
}
public static akka.protobuf.Parser<ShardingEnvelope> PARSER =
new akka.protobuf.AbstractParser<ShardingEnvelope>() {
public ShardingEnvelope parsePartialFrom(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return new ShardingEnvelope(input, extensionRegistry);
}
};
@java.lang.Override
public akka.protobuf.Parser<ShardingEnvelope> getParserForType() {
return PARSER;
}
private int bitField0_;
// required string entityId = 1;
public static final int ENTITYID_FIELD_NUMBER = 1;
private java.lang.Object entityId_;
/**
* <code>required string entityId = 1;</code>
*/
public boolean hasEntityId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required string entityId = 1;</code>
*/
public java.lang.String getEntityId() {
java.lang.Object ref = entityId_;
if (ref instanceof java.lang.String) {
return (java.lang.String) ref;
} else {
akka.protobuf.ByteString bs =
(akka.protobuf.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
entityId_ = s;
}
return s;
}
}
/**
* <code>required string entityId = 1;</code>
*/
public akka.protobuf.ByteString
getEntityIdBytes() {
java.lang.Object ref = entityId_;
if (ref instanceof java.lang.String) {
akka.protobuf.ByteString b =
akka.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
entityId_ = b;
return b;
} else {
return (akka.protobuf.ByteString) ref;
}
}
// optional .Payload message = 2;
public static final int MESSAGE_FIELD_NUMBER = 2;
private akka.remote.ContainerFormats.Payload message_;
/**
* <code>optional .Payload message = 2;</code>
*/
public boolean hasMessage() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>optional .Payload message = 2;</code>
*/
public akka.remote.ContainerFormats.Payload getMessage() {
return message_;
}
/**
* <code>optional .Payload message = 2;</code>
*/
public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() {
return message_;
}
private void initFields() {
entityId_ = "";
message_ = akka.remote.ContainerFormats.Payload.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
if (!hasEntityId()) {
memoizedIsInitialized = 0;
return false;
}
if (hasMessage()) {
if (!getMessage().isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
}
memoizedIsInitialized = 1;
return true;
}
public void writeTo(akka.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeBytes(1, getEntityIdBytes());
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeMessage(2, message_);
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += akka.protobuf.CodedOutputStream
.computeBytesSize(1, getEntityIdBytes());
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += akka.protobuf.CodedOutputStream
.computeMessageSize(2, message_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom(
akka.protobuf.ByteString data)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom(
akka.protobuf.ByteString data,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom(byte[] data)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom(
byte[] data,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom(
java.io.InputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseDelimitedFrom(
java.io.InputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom(
akka.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
akka.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code akka.typed.cluster.sharding.ShardingEnvelope}
*/
public static final class Builder extends
akka.protobuf.GeneratedMessage.Builder<Builder>
implements akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelopeOrBuilder {
public static final akka.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.internal_static_akka_typed_cluster_sharding_ShardingEnvelope_descriptor;
}
protected akka.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.internal_static_akka_typed_cluster_sharding_ShardingEnvelope_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.class, akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.Builder.class);
}
// Construct using akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
akka.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getMessageFieldBuilder();
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
entityId_ = "";
bitField0_ = (bitField0_ & ~0x00000001);
if (messageBuilder_ == null) {
message_ = akka.remote.ContainerFormats.Payload.getDefaultInstance();
} else {
messageBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public akka.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.internal_static_akka_typed_cluster_sharding_ShardingEnvelope_descriptor;
}
public akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope getDefaultInstanceForType() {
return akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.getDefaultInstance();
}
public akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope build() {
akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope buildPartial() {
akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope result = new akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
result.entityId_ = entityId_;
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
if (messageBuilder_ == null) {
result.message_ = message_;
} else {
result.message_ = messageBuilder_.build();
}
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(akka.protobuf.Message other) {
if (other instanceof akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope) {
return mergeFrom((akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope other) {
if (other == akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.getDefaultInstance()) return this;
if (other.hasEntityId()) {
bitField0_ |= 0x00000001;
entityId_ = other.entityId_;
onChanged();
}
if (other.hasMessage()) {
mergeMessage(other.getMessage());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
if (!hasEntityId()) {
return false;
}
if (hasMessage()) {
if (!getMessage().isInitialized()) {
return false;
}
}
return true;
}
public Builder mergeFrom(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (akka.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
// required string entityId = 1;
private java.lang.Object entityId_ = "";
/**
* <code>required string entityId = 1;</code>
*/
public boolean hasEntityId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required string entityId = 1;</code>
*/
public java.lang.String getEntityId() {
java.lang.Object ref = entityId_;
if (!(ref instanceof java.lang.String)) {
java.lang.String s = ((akka.protobuf.ByteString) ref)
.toStringUtf8();
entityId_ = s;
return s;
} else {
return (java.lang.String) ref;
}
}
/**
* <code>required string entityId = 1;</code>
*/
public akka.protobuf.ByteString
getEntityIdBytes() {
java.lang.Object ref = entityId_;
if (ref instanceof String) {
akka.protobuf.ByteString b =
akka.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
entityId_ = b;
return b;
} else {
return (akka.protobuf.ByteString) ref;
}
}
/**
* <code>required string entityId = 1;</code>
*/
public Builder setEntityId(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
entityId_ = value;
onChanged();
return this;
}
/**
* <code>required string entityId = 1;</code>
*/
public Builder clearEntityId() {
bitField0_ = (bitField0_ & ~0x00000001);
entityId_ = getDefaultInstance().getEntityId();
onChanged();
return this;
}
/**
* <code>required string entityId = 1;</code>
*/
public Builder setEntityIdBytes(
akka.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
entityId_ = value;
onChanged();
return this;
}
// optional .Payload message = 2;
private akka.remote.ContainerFormats.Payload message_ = akka.remote.ContainerFormats.Payload.getDefaultInstance();
private akka.protobuf.SingleFieldBuilder<
akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder> messageBuilder_;
/**
* <code>optional .Payload message = 2;</code>
*/
public boolean hasMessage() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>optional .Payload message = 2;</code>
*/
public akka.remote.ContainerFormats.Payload getMessage() {
if (messageBuilder_ == null) {
return message_;
} else {
return messageBuilder_.getMessage();
}
}
/**
* <code>optional .Payload message = 2;</code>
*/
public Builder setMessage(akka.remote.ContainerFormats.Payload value) {
if (messageBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
message_ = value;
onChanged();
} else {
messageBuilder_.setMessage(value);
}
bitField0_ |= 0x00000002;
return this;
}
/**
* <code>optional .Payload message = 2;</code>
*/
public Builder setMessage(
akka.remote.ContainerFormats.Payload.Builder builderForValue) {
if (messageBuilder_ == null) {
message_ = builderForValue.build();
onChanged();
} else {
messageBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000002;
return this;
}
/**
* <code>optional .Payload message = 2;</code>
*/
public Builder mergeMessage(akka.remote.ContainerFormats.Payload value) {
if (messageBuilder_ == null) {
if (((bitField0_ & 0x00000002) == 0x00000002) &&
message_ != akka.remote.ContainerFormats.Payload.getDefaultInstance()) {
message_ =
akka.remote.ContainerFormats.Payload.newBuilder(message_).mergeFrom(value).buildPartial();
} else {
message_ = value;
}
onChanged();
} else {
messageBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000002;
return this;
}
/**
* <code>optional .Payload message = 2;</code>
*/
public Builder clearMessage() {
if (messageBuilder_ == null) {
message_ = akka.remote.ContainerFormats.Payload.getDefaultInstance();
onChanged();
} else {
messageBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
/**
* <code>optional .Payload message = 2;</code>
*/
public akka.remote.ContainerFormats.Payload.Builder getMessageBuilder() {
bitField0_ |= 0x00000002;
onChanged();
return getMessageFieldBuilder().getBuilder();
}
/**
* <code>optional .Payload message = 2;</code>
*/
public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() {
if (messageBuilder_ != null) {
return messageBuilder_.getMessageOrBuilder();
} else {
return message_;
}
}
/**
* <code>optional .Payload message = 2;</code>
*/
private akka.protobuf.SingleFieldBuilder<
akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder>
getMessageFieldBuilder() {
if (messageBuilder_ == null) {
messageBuilder_ = new akka.protobuf.SingleFieldBuilder<
akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder>(
message_,
getParentForChildren(),
isClean());
message_ = null;
}
return messageBuilder_;
}
// @@protoc_insertion_point(builder_scope:akka.typed.cluster.sharding.ShardingEnvelope)
}
static {
defaultInstance = new ShardingEnvelope(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:akka.typed.cluster.sharding.ShardingEnvelope)
}
private static akka.protobuf.Descriptors.Descriptor
internal_static_akka_typed_cluster_sharding_ShardingEnvelope_descriptor;
private static
akka.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_akka_typed_cluster_sharding_ShardingEnvelope_fieldAccessorTable;
public static akka.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static akka.protobuf.Descriptors.FileDescriptor
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\026ShardingMessages.proto\022\033akka.typed.clu" +
"ster.sharding\032\026ContainerFormats.proto\"?\n" +
"\020ShardingEnvelope\022\020\n\010entityId\030\001 \002(\t\022\031\n\007m" +
"essage\030\002 \001(\0132\010.PayloadB1\n-akka.typed.clu" +
"ster.sharding.internal.protobufH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public akka.protobuf.ExtensionRegistry assignDescriptors(
akka.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_akka_typed_cluster_sharding_ShardingEnvelope_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_akka_typed_cluster_sharding_ShardingEnvelope_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_typed_cluster_sharding_ShardingEnvelope_descriptor,
new java.lang.String[] { "EntityId", "Message", });
return null;
}
};
akka.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new akka.protobuf.Descriptors.FileDescriptor[] {
akka.remote.ContainerFormats.getDescriptor(),
}, assigner);
}
// @@protoc_insertion_point(outer_class_scope)
}

View file

@ -0,0 +1,13 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.sharding;
option java_package = "akka.typed.cluster.sharding.internal.protobuf";
option optimize_for = SPEED;
import "ContainerFormats.proto";
message ShardingEnvelope {
required string entityId = 1;
optional Payload message = 2;
}

View file

@ -33,12 +33,15 @@ akka.typed {
akka.actor {
serializers {
typed-misc = "akka.typed.cluster.internal.MiscMessageSerializer"
typed-sharding = "akka.typed.cluster.sharding.internal.ShardingSerializer"
}
serialization-identifiers {
"akka.typed.cluster.internal.MiscMessageSerializer" = 24
"akka.typed.cluster.sharding.internal.ShardingSerializer" = 25
}
serialization-bindings {
"akka.typed.ActorRef" = typed-misc
"akka.typed.internal.adapter.ActorRefAdapter" = typed-misc
"akka.typed.cluster.sharding.ShardingEnvelope" = typed-sharding
}
}

View file

@ -11,6 +11,7 @@ import akka.typed.ActorRef
import akka.typed.cluster.ActorRefResolver
import akka.typed.internal.adapter.ActorRefAdapter
import akka.typed.scaladsl.adapter._
import java.io.NotSerializableException
@InternalApi
class MiscMessageSerializer(val system: akka.actor.ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer {
@ -19,14 +20,21 @@ class MiscMessageSerializer(val system: akka.actor.ExtendedActorSystem) extends
def manifest(o: AnyRef) = o match {
case ref: ActorRef[_] "a"
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
def toBinary(o: AnyRef) = o match {
case ref: ActorRef[_] resolver.toSerializationFormat(ref).getBytes(StandardCharsets.UTF_8)
case _
throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]")
}
def fromBinary(bytes: Array[Byte], manifest: String) = manifest match {
case "a" resolver.resolveActorRef(new String(bytes, StandardCharsets.UTF_8))
case _
throw new NotSerializableException(
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
}
}

View file

@ -0,0 +1,65 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.sharding.internal
import akka.typed.cluster.sharding.internal.protobuf.ShardingMessages
import java.nio.charset.StandardCharsets
import akka.annotation.InternalApi
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
import akka.typed.ActorRef
import akka.typed.cluster.ActorRefResolver
import akka.typed.internal.adapter.ActorRefAdapter
import akka.typed.scaladsl.adapter._
import akka.remote.serialization.WrappedPayloadSupport
import akka.typed.cluster.sharding.ShardingEnvelope
import java.io.NotSerializableException
import akka.typed.cluster.sharding.StartEntity
/**
* INTERNAL API
*/
@InternalApi private[akka] class ShardingSerializer(val system: akka.actor.ExtendedActorSystem)
extends SerializerWithStringManifest with BaseSerializer {
private val payloadSupport = new WrappedPayloadSupport(system)
private val ShardingEnvelopeManifest = "a"
override def manifest(o: AnyRef): String = o match {
case ref: ShardingEnvelope[_] ShardingEnvelopeManifest
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
override def toBinary(o: AnyRef): Array[Byte] = o match {
case env: ShardingEnvelope[_]
val builder = ShardingMessages.ShardingEnvelope.newBuilder()
builder.setEntityId(env.entityId)
// special null for StartEntity, might change with issue #23679
if (env.message != null)
builder.setMessage(payloadSupport.payloadBuilder(env.message))
builder.build().toByteArray()
case _
throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]")
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case ShardingEnvelopeManifest
val env = ShardingMessages.ShardingEnvelope.parseFrom(bytes)
val entityId = env.getEntityId
if (env.hasMessage) {
val wrappedMsg = payloadSupport.deserializePayload(env.getMessage)
ShardingEnvelope(entityId, wrappedMsg)
} else {
// special for StartEntity, might change with issue #23679
StartEntity(entityId)
}
case _
throw new NotSerializableException(
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
}
}

View file

@ -372,6 +372,8 @@ lazy val typed = akkaModule("akka-typed")
)
.settings(AkkaBuild.mayChangeSettings)
.settings(AutomaticModuleName.settings("akka.typed")) // fine for now, eventually new module name to become typed.actor
// To be ablet to import ContainerFormats.proto
.settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf" ))
.settings(
initialCommands := """
import akka.typed._

View file

@ -16,6 +16,7 @@ import Keys._
object Protobuf {
val paths = SettingKey[Seq[File]]("protobuf-paths", "The paths that contain *.proto files.")
val outputPaths = SettingKey[Seq[File]]("protobuf-output-paths", "The paths where to save the generated *.java files.")
val importPath = SettingKey[Option[File]]("protobuf-import-path", "The path that contain additional *.proto files that can be imported.")
val protoc = SettingKey[String]("protobuf-protoc", "The path and name of the protoc executable.")
val protocVersion = SettingKey[String]("protobuf-protoc-version", "The version of the protoc executable.")
val generate = TaskKey[Unit]("protobuf-generate", "Compile the protobuf sources and do all processing.")
@ -23,6 +24,7 @@ object Protobuf {
lazy val settings: Seq[Setting[_]] = Seq(
paths := Seq((sourceDirectory in Compile).value, (sourceDirectory in Test).value).map(_ / "protobuf"),
outputPaths := Seq((sourceDirectory in Compile).value, (sourceDirectory in Test).value).map(_ / "java"),
importPath := None,
protoc := "protoc",
protocVersion := "2.5.0",
generate := {
@ -48,7 +50,7 @@ object Protobuf {
val relative = src.relativeTo(sources).getOrElse(throw new Exception(s"path $src is not a in source tree $sources")).toString
val tmp = targets / "protoc" / relative
IO.delete(tmp)
generate(cmd, src, tmp, log)
generate(cmd, src, tmp, log, importPath.value)
transformDirectory(tmp, dst, _ true, transformFile(_.replace("com.google.protobuf", "akka.protobuf")), cache, log)
}
}
@ -71,7 +73,7 @@ object Protobuf {
}
}
private def generate(protoc: String, srcDir: File, targetDir: File, log: Logger): Unit = {
private def generate(protoc: String, srcDir: File, targetDir: File, log: Logger, importPath: Option[File]): Unit = {
val protoFiles = (srcDir ** "*.proto").get
if (srcDir.exists)
if (protoFiles.isEmpty)
@ -82,8 +84,13 @@ object Protobuf {
log.info("Generating %d protobuf files from %s to %s".format(protoFiles.size, srcDir, targetDir))
protoFiles.foreach { proto log.info("Compiling %s" format proto) }
val protoPathArg = importPath match {
case None => Nil
case Some(p) => Seq("--proto_path", p.absolutePath)
}
val exitCode = callProtoc(protoc, Seq("-I" + srcDir.absolutePath, "--java_out=%s" format targetDir.absolutePath) ++
protoFiles.map(_.absolutePath), log, { (p, l) p ! l })
protoPathArg ++ protoFiles.map(_.absolutePath), log, { (p, l) => p ! l })
if (exitCode != 0)
sys.error("protoc returned exit code: %d" format exitCode)
}