diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/internal/MiscMessageSerializerSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/internal/MiscMessageSerializerSpec.scala index a02ec5c605..01ca2830a3 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/cluster/internal/MiscMessageSerializerSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/internal/MiscMessageSerializerSpec.scala @@ -6,8 +6,8 @@ package akka.typed.cluster.internal import akka.serialization.{ SerializationExtension, SerializerWithStringManifest } import akka.typed.{ ActorRef, TypedSpec } import akka.typed.TypedSpec.Create -import akka.typed.internal.adapter.ActorSystemAdapter import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.adapter._ import akka.typed.scaladsl.AskPattern._ import com.typesafe.config.ConfigFactory @@ -28,23 +28,22 @@ class MiscMessageSerializerSpec extends TypedSpec(MiscMessageSerializerSpec.conf object `The typed MiscMessageSerializer` { - def `must serialize and deserialize typed actor refs `(): Unit = { + val serialization = SerializationExtension(system.toUntyped) - val ref = (system ? Create(Actor.empty[Unit], "some-actor")).futureValue - - val serialization = SerializationExtension(ActorSystemAdapter.toUntyped(system)) - - val serializer = serialization.findSerializerFor(ref) match { - case s: SerializerWithStringManifest ⇒ s + def checkSerialization(obj: AnyRef): Unit = { + serialization.findSerializerFor(obj) match { + case serializer: MiscMessageSerializer ⇒ + val blob = serializer.toBinary(obj) + val ref = serializer.fromBinary(blob, serializer.manifest(obj)) + ref should ===(obj) + case s ⇒ + throw new IllegalStateException(s"Wrong serializer ${s.getClass} for ${obj.getClass}") } + } - val manifest = serializer.manifest(ref) - val serialized = serializer.toBinary(ref) - - val result = serializer.fromBinary(serialized, manifest) - - result should ===(ref) - + def `must serialize and deserialize typed actor refs `(): Unit = { + val ref = (system ? Create(Actor.empty[Unit], "some-actor")).futureValue + checkSerialization(ref) } } diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingSpec.scala index 187ed4c339..45811052d4 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingSpec.scala @@ -16,10 +16,18 @@ import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.ScalaFutures import scala.concurrent.duration._ +import scala.concurrent.Await +import akka.typed.cluster.Join +import org.scalatest.concurrent.Eventually +import akka.cluster.MemberStatus +import akka.actor.ExtendedActorSystem +import akka.serialization.SerializerWithStringManifest +import akka.typed.cluster.ActorRefResolver +import java.nio.charset.StandardCharsets object ClusterShardingSpec { val config = ConfigFactory.parseString( - """ + s""" akka.actor.provider = cluster // akka.loglevel = debug @@ -36,10 +44,18 @@ object ClusterShardingSpec { akka.actor { serialize-messages = off allow-java-serialization = off + + serializers { + test = "akka.typed.cluster.sharding.ClusterShardingSpec$$Serializer" + } + serialization-bindings { + "akka.typed.cluster.sharding.ClusterShardingSpec$$TestProtocol" = test + "akka.typed.cluster.sharding.ClusterShardingSpec$$IdTestProtocol" = test + } } """.stripMargin) - sealed trait TestProtocol + sealed trait TestProtocol extends java.io.Serializable final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol final case class StopPlz() extends TestProtocol @@ -49,9 +65,59 @@ object ClusterShardingSpec { final case class IdWhoAreYou(id: String, replyTo: ActorRef[String]) extends IdTestProtocol final case class IdStopPlz(id: String) extends IdTestProtocol + class Serializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + def identifier: Int = 48 + def manifest(o: AnyRef): String = o match { + case _: ReplyPlz ⇒ "a" + case _: WhoAreYou ⇒ "b" + case _: StopPlz ⇒ "c" + case _: IdReplyPlz ⇒ "A" + case _: IdWhoAreYou ⇒ "B" + case _: IdStopPlz ⇒ "C" + } + + private def actorRefToBinary(ref: ActorRef[_]): Array[Byte] = + ActorRefResolver(system.toTyped).toSerializationFormat(ref).getBytes(StandardCharsets.UTF_8) + + private def idAndRefToBinary(id: String, ref: ActorRef[_]): Array[Byte] = { + val idBytes = id.getBytes(StandardCharsets.UTF_8) + val refBytes = actorRefToBinary(ref) + // yeah, very ad-hoc ;-) + Array(idBytes.length.toByte) ++ idBytes ++ refBytes + } + + def toBinary(o: AnyRef): Array[Byte] = o match { + case ReplyPlz(ref) ⇒ actorRefToBinary(ref) + case WhoAreYou(ref) ⇒ actorRefToBinary(ref) + case _: StopPlz ⇒ Array.emptyByteArray + case IdReplyPlz(id, ref) ⇒ idAndRefToBinary(id, ref) + case IdWhoAreYou(id, ref) ⇒ idAndRefToBinary(id, ref) + case IdStopPlz(id) ⇒ id.getBytes(StandardCharsets.UTF_8) + } + + private def actorRefFromBinary[T](bytes: Array[Byte]): ActorRef[T] = + ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8)) + + private def idAndRefFromBinary[T](bytes: Array[Byte]): (String, ActorRef[T]) = { + val idLength = bytes(0) + val id = new String(bytes.slice(1, idLength), StandardCharsets.UTF_8) + val ref = actorRefFromBinary(bytes.drop(1 + idLength)) + (id, ref) + } + + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { + case "a" ⇒ ReplyPlz(actorRefFromBinary(bytes)) + case "b" ⇒ WhoAreYou(actorRefFromBinary(bytes)) + case "c" ⇒ StopPlz() + case "A" ⇒ IdReplyPlz.tupled(idAndRefFromBinary(bytes)) + case "B" ⇒ IdWhoAreYou.tupled(idAndRefFromBinary(bytes)) + case "C" ⇒ IdStopPlz(new String(bytes, StandardCharsets.UTF_8)) + } + } + } -class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with ScalaFutures { +class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with ScalaFutures with Eventually { import akka.typed.scaladsl.adapter._ import ClusterShardingSpec._ @@ -60,7 +126,15 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca val sharding = ClusterSharding(system) implicit val untypedSystem = system.toUntyped - private val untypedCluster = akka.cluster.Cluster(untypedSystem) + + val untypedSystem2 = akka.actor.ActorSystem(system.name, system.settings.config) + val system2 = untypedSystem2.toTyped + val sharding2 = ClusterSharding(system2) + + override def afterAll(): Unit = { + Await.result(system2.terminate, timeout.duration) + super.afterAll() + } val typeKey = EntityTypeKey[TestProtocol]("envelope-shard") val behavior = Actor.immutable[TestProtocol] { @@ -92,9 +166,22 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca object `Typed cluster sharding` { - untypedCluster.join(untypedCluster.selfAddress) + def `01 must join cluster`(): Unit = { + Cluster(system).manager ! Join(Cluster(system).selfMember.address) + Cluster(system2).manager ! Join(Cluster(system).selfMember.address) - def `01 must send messsages via cluster sharding, using envelopes`(): Unit = { + eventually { + Cluster(system).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + Cluster(system).state.members.size should ===(2) + } + eventually { + Cluster(system2).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + Cluster(system2).state.members.size should ===(2) + } + + } + + def `02 must send messsages via cluster sharding, using envelopes`(): Unit = { val ref = sharding.spawn( behavior, Props.empty, @@ -102,6 +189,13 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca ClusterShardingSettings(system), 10, StopPlz()) + sharding2.spawn( + behavior, + Props.empty, + typeKey, + ClusterShardingSettings(system2), + 10, + StopPlz()) val p = TestProbe[String]() ref ! ShardingEnvelope("test", ReplyPlz(p.ref)) @@ -109,7 +203,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca ref ! ShardingEnvelope("test", StopPlz()) } - def `02 must send messsages via cluster sharding, without envelopes`(): Unit = { + def `03 must send messsages via cluster sharding, without envelopes`(): Unit = { val ref = sharding.spawn( behaviorWithId, Props.empty, @@ -117,6 +211,13 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca ClusterShardingSettings(system), ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, _.id), IdStopPlz("THE_ID_HERE")) + sharding2.spawn( + behaviorWithId, + Props.empty, + typeKey2, + ClusterShardingSettings(system2), + ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, _.id), + IdStopPlz("THE_ID_HERE")) val p = TestProbe[String]() ref ! IdReplyPlz("test", p.ref) @@ -125,7 +226,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca ref ! IdStopPlz("test") } - // def `03 fail if starting sharding for already used typeName, but with wrong type`(): Unit = { + // def `04 fail if starting sharding for already used typeName, but with wrong type`(): Unit = { // val ex = intercept[Exception] { // sharding.spawn( // Actor.empty[String], @@ -140,8 +241,6 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca // ex.getMessage should include("already started") // } - untypedCluster.join(untypedCluster.selfAddress) - def `11 EntityRef - tell`(): Unit = { val charlieRef = sharding.entityRefFor(typeKey, "charlie") diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ShardingSerializerSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ShardingSerializerSpec.scala new file mode 100644 index 0000000000..ef91f3f6df --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ShardingSerializerSpec.scala @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.cluster.sharding + +import akka.serialization.SerializationExtension +import akka.typed.TypedSpec +import akka.typed.cluster.sharding.internal.ShardingSerializer +import akka.typed.internal.adapter.ActorSystemAdapter +import akka.typed.scaladsl.AskPattern._ + +class ShardingSerializerSpec extends TypedSpec { + + object `The typed ShardingSerializer` { + + val serialization = SerializationExtension(ActorSystemAdapter.toUntyped(system)) + + def checkSerialization(obj: AnyRef): Unit = { + serialization.findSerializerFor(obj) match { + case serializer: ShardingSerializer ⇒ + val blob = serializer.toBinary(obj) + val ref = serializer.fromBinary(blob, serializer.manifest(obj)) + ref should ===(obj) + case s ⇒ + throw new IllegalStateException(s"Wrong serializer ${s.getClass} for ${obj.getClass}") + } + } + + def `must serialize and deserialize ShardingEnvelope`(): Unit = { + checkSerialization(ShardingEnvelope("abc", 42)) + } + + def `must serialize and deserialize StartEntity`(): Unit = { + checkSerialization(StartEntity("abc")) + } + } + +} diff --git a/akka-typed/src/main/java/akka/typed/cluster/sharding/internal/protobuf/ShardingMessages.java b/akka-typed/src/main/java/akka/typed/cluster/sharding/internal/protobuf/ShardingMessages.java new file mode 100644 index 0000000000..665dd9344e --- /dev/null +++ b/akka-typed/src/main/java/akka/typed/cluster/sharding/internal/protobuf/ShardingMessages.java @@ -0,0 +1,736 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: ShardingMessages.proto + +package akka.typed.cluster.sharding.internal.protobuf; + +public final class ShardingMessages { + private ShardingMessages() {} + public static void registerAllExtensions( + akka.protobuf.ExtensionRegistry registry) { + } + public interface ShardingEnvelopeOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // required string entityId = 1; + /** + * required string entityId = 1; + */ + boolean hasEntityId(); + /** + * required string entityId = 1; + */ + java.lang.String getEntityId(); + /** + * required string entityId = 1; + */ + akka.protobuf.ByteString + getEntityIdBytes(); + + // optional .Payload message = 2; + /** + * optional .Payload message = 2; + */ + boolean hasMessage(); + /** + * optional .Payload message = 2; + */ + akka.remote.ContainerFormats.Payload getMessage(); + /** + * optional .Payload message = 2; + */ + akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder(); + } + /** + * Protobuf type {@code akka.typed.cluster.sharding.ShardingEnvelope} + */ + public static final class ShardingEnvelope extends + akka.protobuf.GeneratedMessage + implements ShardingEnvelopeOrBuilder { + // Use ShardingEnvelope.newBuilder() to construct. + private ShardingEnvelope(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private ShardingEnvelope(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final ShardingEnvelope defaultInstance; + public static ShardingEnvelope getDefaultInstance() { + return defaultInstance; + } + + public ShardingEnvelope getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private ShardingEnvelope( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + entityId_ = input.readBytes(); + break; + } + case 18: { + akka.remote.ContainerFormats.Payload.Builder subBuilder = null; + if (((bitField0_ & 0x00000002) == 0x00000002)) { + subBuilder = message_.toBuilder(); + } + message_ = input.readMessage(akka.remote.ContainerFormats.Payload.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(message_); + message_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000002; + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.internal_static_akka_typed_cluster_sharding_ShardingEnvelope_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.internal_static_akka_typed_cluster_sharding_ShardingEnvelope_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.class, akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public ShardingEnvelope parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new ShardingEnvelope(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required string entityId = 1; + public static final int ENTITYID_FIELD_NUMBER = 1; + private java.lang.Object entityId_; + /** + * required string entityId = 1; + */ + public boolean hasEntityId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string entityId = 1; + */ + public java.lang.String getEntityId() { + java.lang.Object ref = entityId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + akka.protobuf.ByteString bs = + (akka.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + entityId_ = s; + } + return s; + } + } + /** + * required string entityId = 1; + */ + public akka.protobuf.ByteString + getEntityIdBytes() { + java.lang.Object ref = entityId_; + if (ref instanceof java.lang.String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + entityId_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + + // optional .Payload message = 2; + public static final int MESSAGE_FIELD_NUMBER = 2; + private akka.remote.ContainerFormats.Payload message_; + /** + * optional .Payload message = 2; + */ + public boolean hasMessage() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional .Payload message = 2; + */ + public akka.remote.ContainerFormats.Payload getMessage() { + return message_; + } + /** + * optional .Payload message = 2; + */ + public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() { + return message_; + } + + private void initFields() { + entityId_ = ""; + message_ = akka.remote.ContainerFormats.Payload.getDefaultInstance(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasEntityId()) { + memoizedIsInitialized = 0; + return false; + } + if (hasMessage()) { + if (!getMessage().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getEntityIdBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeMessage(2, message_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(1, getEntityIdBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeMessageSize(2, message_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code akka.typed.cluster.sharding.ShardingEnvelope} + */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelopeOrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.internal_static_akka_typed_cluster_sharding_ShardingEnvelope_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.internal_static_akka_typed_cluster_sharding_ShardingEnvelope_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.class, akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.Builder.class); + } + + // Construct using akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getMessageFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + entityId_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + if (messageBuilder_ == null) { + message_ = akka.remote.ContainerFormats.Payload.getDefaultInstance(); + } else { + messageBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.internal_static_akka_typed_cluster_sharding_ShardingEnvelope_descriptor; + } + + public akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope getDefaultInstanceForType() { + return akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.getDefaultInstance(); + } + + public akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope build() { + akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope buildPartial() { + akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope result = new akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.entityId_ = entityId_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + if (messageBuilder_ == null) { + result.message_ = message_; + } else { + result.message_ = messageBuilder_.build(); + } + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope) { + return mergeFrom((akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope other) { + if (other == akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope.getDefaultInstance()) return this; + if (other.hasEntityId()) { + bitField0_ |= 0x00000001; + entityId_ = other.entityId_; + onChanged(); + } + if (other.hasMessage()) { + mergeMessage(other.getMessage()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasEntityId()) { + + return false; + } + if (hasMessage()) { + if (!getMessage().isInitialized()) { + + return false; + } + } + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.typed.cluster.sharding.internal.protobuf.ShardingMessages.ShardingEnvelope) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required string entityId = 1; + private java.lang.Object entityId_ = ""; + /** + * required string entityId = 1; + */ + public boolean hasEntityId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string entityId = 1; + */ + public java.lang.String getEntityId() { + java.lang.Object ref = entityId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((akka.protobuf.ByteString) ref) + .toStringUtf8(); + entityId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string entityId = 1; + */ + public akka.protobuf.ByteString + getEntityIdBytes() { + java.lang.Object ref = entityId_; + if (ref instanceof String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + entityId_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + /** + * required string entityId = 1; + */ + public Builder setEntityId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + entityId_ = value; + onChanged(); + return this; + } + /** + * required string entityId = 1; + */ + public Builder clearEntityId() { + bitField0_ = (bitField0_ & ~0x00000001); + entityId_ = getDefaultInstance().getEntityId(); + onChanged(); + return this; + } + /** + * required string entityId = 1; + */ + public Builder setEntityIdBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + entityId_ = value; + onChanged(); + return this; + } + + // optional .Payload message = 2; + private akka.remote.ContainerFormats.Payload message_ = akka.remote.ContainerFormats.Payload.getDefaultInstance(); + private akka.protobuf.SingleFieldBuilder< + akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder> messageBuilder_; + /** + * optional .Payload message = 2; + */ + public boolean hasMessage() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional .Payload message = 2; + */ + public akka.remote.ContainerFormats.Payload getMessage() { + if (messageBuilder_ == null) { + return message_; + } else { + return messageBuilder_.getMessage(); + } + } + /** + * optional .Payload message = 2; + */ + public Builder setMessage(akka.remote.ContainerFormats.Payload value) { + if (messageBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + message_ = value; + onChanged(); + } else { + messageBuilder_.setMessage(value); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * optional .Payload message = 2; + */ + public Builder setMessage( + akka.remote.ContainerFormats.Payload.Builder builderForValue) { + if (messageBuilder_ == null) { + message_ = builderForValue.build(); + onChanged(); + } else { + messageBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * optional .Payload message = 2; + */ + public Builder mergeMessage(akka.remote.ContainerFormats.Payload value) { + if (messageBuilder_ == null) { + if (((bitField0_ & 0x00000002) == 0x00000002) && + message_ != akka.remote.ContainerFormats.Payload.getDefaultInstance()) { + message_ = + akka.remote.ContainerFormats.Payload.newBuilder(message_).mergeFrom(value).buildPartial(); + } else { + message_ = value; + } + onChanged(); + } else { + messageBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * optional .Payload message = 2; + */ + public Builder clearMessage() { + if (messageBuilder_ == null) { + message_ = akka.remote.ContainerFormats.Payload.getDefaultInstance(); + onChanged(); + } else { + messageBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + /** + * optional .Payload message = 2; + */ + public akka.remote.ContainerFormats.Payload.Builder getMessageBuilder() { + bitField0_ |= 0x00000002; + onChanged(); + return getMessageFieldBuilder().getBuilder(); + } + /** + * optional .Payload message = 2; + */ + public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() { + if (messageBuilder_ != null) { + return messageBuilder_.getMessageOrBuilder(); + } else { + return message_; + } + } + /** + * optional .Payload message = 2; + */ + private akka.protobuf.SingleFieldBuilder< + akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder> + getMessageFieldBuilder() { + if (messageBuilder_ == null) { + messageBuilder_ = new akka.protobuf.SingleFieldBuilder< + akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder>( + message_, + getParentForChildren(), + isClean()); + message_ = null; + } + return messageBuilder_; + } + + // @@protoc_insertion_point(builder_scope:akka.typed.cluster.sharding.ShardingEnvelope) + } + + static { + defaultInstance = new ShardingEnvelope(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:akka.typed.cluster.sharding.ShardingEnvelope) + } + + private static akka.protobuf.Descriptors.Descriptor + internal_static_akka_typed_cluster_sharding_ShardingEnvelope_descriptor; + private static + akka.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_akka_typed_cluster_sharding_ShardingEnvelope_fieldAccessorTable; + + public static akka.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static akka.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\026ShardingMessages.proto\022\033akka.typed.clu" + + "ster.sharding\032\026ContainerFormats.proto\"?\n" + + "\020ShardingEnvelope\022\020\n\010entityId\030\001 \002(\t\022\031\n\007m" + + "essage\030\002 \001(\0132\010.PayloadB1\n-akka.typed.clu" + + "ster.sharding.internal.protobufH\001" + }; + akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public akka.protobuf.ExtensionRegistry assignDescriptors( + akka.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_akka_typed_cluster_sharding_ShardingEnvelope_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_akka_typed_cluster_sharding_ShardingEnvelope_fieldAccessorTable = new + akka.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_akka_typed_cluster_sharding_ShardingEnvelope_descriptor, + new java.lang.String[] { "EntityId", "Message", }); + return null; + } + }; + akka.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new akka.protobuf.Descriptors.FileDescriptor[] { + akka.remote.ContainerFormats.getDescriptor(), + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/akka-typed/src/main/protobuf/ShardingMessages.proto b/akka-typed/src/main/protobuf/ShardingMessages.proto new file mode 100644 index 0000000000..8cb13d8be2 --- /dev/null +++ b/akka-typed/src/main/protobuf/ShardingMessages.proto @@ -0,0 +1,13 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ +package akka.typed.cluster.sharding; + +option java_package = "akka.typed.cluster.sharding.internal.protobuf"; +option optimize_for = SPEED; +import "ContainerFormats.proto"; + +message ShardingEnvelope { + required string entityId = 1; + optional Payload message = 2; +} diff --git a/akka-typed/src/main/resources/reference.conf b/akka-typed/src/main/resources/reference.conf index 2395f7a91e..e4a4b46cae 100644 --- a/akka-typed/src/main/resources/reference.conf +++ b/akka-typed/src/main/resources/reference.conf @@ -33,12 +33,15 @@ akka.typed { akka.actor { serializers { typed-misc = "akka.typed.cluster.internal.MiscMessageSerializer" + typed-sharding = "akka.typed.cluster.sharding.internal.ShardingSerializer" } serialization-identifiers { "akka.typed.cluster.internal.MiscMessageSerializer" = 24 + "akka.typed.cluster.sharding.internal.ShardingSerializer" = 25 } serialization-bindings { "akka.typed.ActorRef" = typed-misc "akka.typed.internal.adapter.ActorRefAdapter" = typed-misc + "akka.typed.cluster.sharding.ShardingEnvelope" = typed-sharding } } diff --git a/akka-typed/src/main/scala/akka/typed/cluster/internal/MiscMessageSerializer.scala b/akka-typed/src/main/scala/akka/typed/cluster/internal/MiscMessageSerializer.scala index df40150b3c..9bab33931a 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/internal/MiscMessageSerializer.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/internal/MiscMessageSerializer.scala @@ -11,6 +11,7 @@ import akka.typed.ActorRef import akka.typed.cluster.ActorRefResolver import akka.typed.internal.adapter.ActorRefAdapter import akka.typed.scaladsl.adapter._ +import java.io.NotSerializableException @InternalApi class MiscMessageSerializer(val system: akka.actor.ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer { @@ -19,14 +20,21 @@ class MiscMessageSerializer(val system: akka.actor.ExtendedActorSystem) extends def manifest(o: AnyRef) = o match { case ref: ActorRef[_] ⇒ "a" + case _ ⇒ + throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") } def toBinary(o: AnyRef) = o match { case ref: ActorRef[_] ⇒ resolver.toSerializationFormat(ref).getBytes(StandardCharsets.UTF_8) + case _ ⇒ + throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]") } def fromBinary(bytes: Array[Byte], manifest: String) = manifest match { case "a" ⇒ resolver.resolveActorRef(new String(bytes, StandardCharsets.UTF_8)) + case _ ⇒ + throw new NotSerializableException( + s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") } } diff --git a/akka-typed/src/main/scala/akka/typed/cluster/sharding/internal/ShardingSerializer.scala b/akka-typed/src/main/scala/akka/typed/cluster/sharding/internal/ShardingSerializer.scala new file mode 100644 index 0000000000..7688a398dc --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/sharding/internal/ShardingSerializer.scala @@ -0,0 +1,65 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.cluster.sharding.internal + +import akka.typed.cluster.sharding.internal.protobuf.ShardingMessages +import java.nio.charset.StandardCharsets + +import akka.annotation.InternalApi +import akka.serialization.{ BaseSerializer, SerializerWithStringManifest } +import akka.typed.ActorRef +import akka.typed.cluster.ActorRefResolver +import akka.typed.internal.adapter.ActorRefAdapter +import akka.typed.scaladsl.adapter._ +import akka.remote.serialization.WrappedPayloadSupport +import akka.typed.cluster.sharding.ShardingEnvelope +import java.io.NotSerializableException +import akka.typed.cluster.sharding.StartEntity + +/** + * INTERNAL API + */ +@InternalApi private[akka] class ShardingSerializer(val system: akka.actor.ExtendedActorSystem) + extends SerializerWithStringManifest with BaseSerializer { + + private val payloadSupport = new WrappedPayloadSupport(system) + + private val ShardingEnvelopeManifest = "a" + + override def manifest(o: AnyRef): String = o match { + case ref: ShardingEnvelope[_] ⇒ ShardingEnvelopeManifest + case _ ⇒ + throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") + } + + override def toBinary(o: AnyRef): Array[Byte] = o match { + case env: ShardingEnvelope[_] ⇒ + val builder = ShardingMessages.ShardingEnvelope.newBuilder() + builder.setEntityId(env.entityId) + // special null for StartEntity, might change with issue #23679 + if (env.message != null) + builder.setMessage(payloadSupport.payloadBuilder(env.message)) + builder.build().toByteArray() + + case _ ⇒ + throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]") + } + + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { + case ShardingEnvelopeManifest ⇒ + val env = ShardingMessages.ShardingEnvelope.parseFrom(bytes) + val entityId = env.getEntityId + if (env.hasMessage) { + val wrappedMsg = payloadSupport.deserializePayload(env.getMessage) + ShardingEnvelope(entityId, wrappedMsg) + } else { + // special for StartEntity, might change with issue #23679 + StartEntity(entityId) + } + case _ ⇒ + throw new NotSerializableException( + s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") + } + +} diff --git a/build.sbt b/build.sbt index a1f5ba1c44..4ee6ed1e3f 100644 --- a/build.sbt +++ b/build.sbt @@ -372,6 +372,8 @@ lazy val typed = akkaModule("akka-typed") ) .settings(AkkaBuild.mayChangeSettings) .settings(AutomaticModuleName.settings("akka.typed")) // fine for now, eventually new module name to become typed.actor + // To be ablet to import ContainerFormats.proto + .settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf" )) .settings( initialCommands := """ import akka.typed._ diff --git a/project/Protobuf.scala b/project/Protobuf.scala index 0a5cf73592..7f016ed55a 100644 --- a/project/Protobuf.scala +++ b/project/Protobuf.scala @@ -16,6 +16,7 @@ import Keys._ object Protobuf { val paths = SettingKey[Seq[File]]("protobuf-paths", "The paths that contain *.proto files.") val outputPaths = SettingKey[Seq[File]]("protobuf-output-paths", "The paths where to save the generated *.java files.") + val importPath = SettingKey[Option[File]]("protobuf-import-path", "The path that contain additional *.proto files that can be imported.") val protoc = SettingKey[String]("protobuf-protoc", "The path and name of the protoc executable.") val protocVersion = SettingKey[String]("protobuf-protoc-version", "The version of the protoc executable.") val generate = TaskKey[Unit]("protobuf-generate", "Compile the protobuf sources and do all processing.") @@ -23,6 +24,7 @@ object Protobuf { lazy val settings: Seq[Setting[_]] = Seq( paths := Seq((sourceDirectory in Compile).value, (sourceDirectory in Test).value).map(_ / "protobuf"), outputPaths := Seq((sourceDirectory in Compile).value, (sourceDirectory in Test).value).map(_ / "java"), + importPath := None, protoc := "protoc", protocVersion := "2.5.0", generate := { @@ -48,7 +50,7 @@ object Protobuf { val relative = src.relativeTo(sources).getOrElse(throw new Exception(s"path $src is not a in source tree $sources")).toString val tmp = targets / "protoc" / relative IO.delete(tmp) - generate(cmd, src, tmp, log) + generate(cmd, src, tmp, log, importPath.value) transformDirectory(tmp, dst, _ ⇒ true, transformFile(_.replace("com.google.protobuf", "akka.protobuf")), cache, log) } } @@ -71,7 +73,7 @@ object Protobuf { } } - private def generate(protoc: String, srcDir: File, targetDir: File, log: Logger): Unit = { + private def generate(protoc: String, srcDir: File, targetDir: File, log: Logger, importPath: Option[File]): Unit = { val protoFiles = (srcDir ** "*.proto").get if (srcDir.exists) if (protoFiles.isEmpty) @@ -82,8 +84,13 @@ object Protobuf { log.info("Generating %d protobuf files from %s to %s".format(protoFiles.size, srcDir, targetDir)) protoFiles.foreach { proto ⇒ log.info("Compiling %s" format proto) } + val protoPathArg = importPath match { + case None => Nil + case Some(p) => Seq("--proto_path", p.absolutePath) + } + val exitCode = callProtoc(protoc, Seq("-I" + srcDir.absolutePath, "--java_out=%s" format targetDir.absolutePath) ++ - protoFiles.map(_.absolutePath), log, { (p, l) ⇒ p ! l }) + protoPathArg ++ protoFiles.map(_.absolutePath), log, { (p, l) => p ! l }) if (exitCode != 0) sys.error("protoc returned exit code: %d" format exitCode) }