Support Protobuf serialization/deserialization of akka.actor.Identify and akka.actor.ActorIdentity (#20380)

This is to ensure Akka Cluster Client will work with Java Serialization turned off.

However for the sake of backward compatibility, the protobuf serialization for `akka.actor.Identify`
and `akka.actor.ActorIdentity` is turned off.
This commit is contained in:
Felix Satyaputra 2016-04-28 23:33:59 +10:00 committed by Konrad Malawski
parent 088bf1b842
commit c4aec5e8a2
5 changed files with 2571 additions and 4 deletions

File diff suppressed because it is too large Load diff

View file

@ -26,4 +26,23 @@ enum PatternType {
message Selection {
required PatternType type = 1;
optional string matcher = 2;
}
}
message Identify {
required Payload messageId = 1;
}
message ActorIdentity {
required Payload correlationId = 1;
optional ActorRef ref = 2;
}
message ActorRef {
required string path = 1;
}
message Payload {
required bytes enclosedMessage = 1;
required int32 serializerId = 2;
optional bytes messageManifest = 4;
}

View file

@ -14,12 +14,20 @@ akka {
serializers {
akka-containers = "akka.remote.serialization.MessageContainerSerializer"
akka-misc = "akka.remote.serialization.MiscMessageSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
daemon-create = "akka.remote.serialization.DaemonMsgCreateSerializer"
}
serialization-bindings {
"akka.actor.ActorSelectionMessage" = akka-containers
# The classes akka.actor.Identify and akka.actor.ActorIdentity serialization/deserialization are required by
# the cluster client to work.
# For the purpose of preserving protocol backward compatibility, akka.actor.Identify and akka.actor.ActorIdentity
# are stil using java serialization by default.
# Should java serialization is disabled, uncomment the following lines
# "akka.actor.Identify" = akka-misc
# "akka.actor.ActorIdentity" = akka-misc
"akka.remote.DaemonMsgCreate" = daemon-create
# Since akka.protobuf.Message does not extend Serializable but
@ -40,6 +48,7 @@ akka {
"akka.remote.serialization.ProtobufSerializer" = 2
"akka.remote.serialization.DaemonMsgCreateSerializer" = 3
"akka.remote.serialization.MessageContainerSerializer" = 6
"akka.remote.serialization.MiscMessageSerializer" = 16
}
deployment {

View file

@ -0,0 +1,117 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.serialization
import akka.actor._
import akka.protobuf.ByteString
import akka.remote.ContainerFormats
import akka.serialization.{ Serialization, BaseSerializer, SerializationExtension, SerializerWithStringManifest }
class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer {
private lazy val serialization = SerializationExtension(system)
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case identify: Identify serializeIdentify(identify)
case identity: ActorIdentity serializeActorIdentity(identity)
case _ throw new IllegalArgumentException(s"Cannot serialize object of type [${obj.getClass.getName}]")
}
private def serializeIdentify(identify: Identify): Array[Byte] =
ContainerFormats.Identify.newBuilder()
.setMessageId(payloadBuilder(identify.messageId))
.build()
.toByteArray
private def serializeActorIdentity(actorIdentity: ActorIdentity): Array[Byte] = {
val builder =
ContainerFormats.ActorIdentity.newBuilder()
.setCorrelationId(payloadBuilder(actorIdentity.correlationId))
actorIdentity.ref.foreach { actorRef
builder.setRef(actorRefBuilder(actorRef))
}
builder
.build()
.toByteArray
}
private def actorRefBuilder(actorRef: ActorRef): ContainerFormats.ActorRef.Builder =
ContainerFormats.ActorRef.newBuilder()
.setPath(Serialization.serializedActorPath(actorRef))
private def payloadBuilder(input: Any): ContainerFormats.Payload.Builder = {
val payload = input.asInstanceOf[AnyRef]
val builder = ContainerFormats.Payload.newBuilder()
val serializer = serialization.findSerializerFor(payload)
builder
.setEnclosedMessage(ByteString.copyFrom(serializer.toBinary(payload)))
.setSerializerId(serializer.identifier)
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(payload)
if (manifest != "")
builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
case _
if (serializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(payload.getClass.getName))
}
builder
}
private val IdentifyManifest = "A"
private val ActorIdentifyManifest = "B"
private val fromBinaryMap = Map[String, Array[Byte] AnyRef](
IdentifyManifest -> deserializeIdentify,
ActorIdentifyManifest -> deserializeActorIdentity)
override def manifest(o: AnyRef): String =
o match {
case _: Identify IdentifyManifest
case _: ActorIdentity ActorIdentifyManifest
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
fromBinaryMap.get(manifest) match {
case Some(deserializer) deserializer(bytes)
case None throw new IllegalArgumentException(
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
}
private def deserializeIdentify(bytes: Array[Byte]): Identify = {
val identifyProto = ContainerFormats.Identify.parseFrom(bytes)
val messageId = deserializePayload(identifyProto.getMessageId)
Identify(messageId)
}
private def deserializeActorIdentity(bytes: Array[Byte]): ActorIdentity = {
val actorIdentityProto = ContainerFormats.ActorIdentity.parseFrom(bytes)
val correlationId = deserializePayload(actorIdentityProto.getCorrelationId)
val actorRef =
if (actorIdentityProto.hasRef)
Some(deserializeActorRef(actorIdentityProto.getRef))
else
None
ActorIdentity(correlationId, actorRef)
}
private def deserializeActorRef(actorRef: ContainerFormats.ActorRef): ActorRef =
serialization.system.provider.resolveActorRef(actorRef.getPath)
private def deserializePayload(payload: ContainerFormats.Payload): Any = {
val manifest = if (payload.hasMessageManifest) payload.getMessageManifest.toStringUtf8 else ""
serialization.deserialize(
payload.getEnclosedMessage.toByteArray,
payload.getSerializerId,
manifest).get
}
}

View file

@ -0,0 +1,63 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.serialization
import akka.actor._
import akka.remote.MessageSerializer
import akka.serialization.SerializationExtension
import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory
object MiscMessageSerializerSpec {
val serializationTestOverrides =
s"""
|akka.actor.serialization-bindings = {
| "akka.actor.Identify" = akka-misc
| "akka.actor.ActorIdentity" = akka-misc
|}
""".stripMargin
val testConfig = ConfigFactory.parseString(serializationTestOverrides).withFallback(AkkaSpec.testConf)
}
class MiscMessageSerializerSpec extends AkkaSpec(MiscMessageSerializerSpec.testConfig) {
"MiscMessageSerializer" must {
Seq(
"Identify" -> Identify("some-message"),
s"ActorIdentity without actor ref" -> ActorIdentity("some-message", ref = None),
s"ActorIdentity with actor ref" -> ActorIdentity("some-message", ref = Some(testActor))).foreach {
case (scenario, item)
s"resolve serializer for $scenario" in {
val serializer = SerializationExtension(system)
serializer.serializerFor(item.getClass).getClass should ===(classOf[MiscMessageSerializer])
}
s"serialize and de-serialize $scenario" in {
verifySerialization(item)
}
}
"reject invalid manifest" in {
intercept[IllegalArgumentException] {
val serializer = new MiscMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
serializer.manifest("INVALID")
}
}
"reject deserialization with invalid manifest" in {
intercept[IllegalArgumentException] {
val serializer = new MiscMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
serializer.fromBinary(Array.empty[Byte], "INVALID")
}
}
def verifySerialization(msg: AnyRef): Unit = {
val serializer = new MiscMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
serializer.fromBinary(serializer.toBinary(msg), serializer.manifest(msg)) should ===(msg)
}
}
}