Typed actorref serializer to allow for java serialization of messages (#23696)
* typed actorref serializer to allow for java serialization of messages * review adressed
This commit is contained in:
parent
9388c69b50
commit
ad103db43c
5 changed files with 111 additions and 15 deletions
|
|
@ -0,0 +1,51 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.typed.cluster.internal
|
||||||
|
|
||||||
|
import akka.serialization.{ SerializationExtension, SerializerWithStringManifest }
|
||||||
|
import akka.typed.{ ActorRef, TypedSpec }
|
||||||
|
import akka.typed.TypedSpec.Create
|
||||||
|
import akka.typed.internal.adapter.ActorSystemAdapter
|
||||||
|
import akka.typed.scaladsl.Actor
|
||||||
|
import akka.typed.scaladsl.AskPattern._
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
|
object MiscMessageSerializerSpec {
|
||||||
|
def config = ConfigFactory.parseString(
|
||||||
|
"""
|
||||||
|
akka.actor {
|
||||||
|
provider = cluster
|
||||||
|
serialize-messages = off
|
||||||
|
allow-java-serialization = true
|
||||||
|
}
|
||||||
|
akka.remote.netty.tcp.port = 0
|
||||||
|
akka.remote.artery.canonical.port = 0
|
||||||
|
""")
|
||||||
|
}
|
||||||
|
|
||||||
|
class MiscMessageSerializerSpec extends TypedSpec(MiscMessageSerializerSpec.config) {
|
||||||
|
|
||||||
|
object `The typed MiscMessageSerializer` {
|
||||||
|
|
||||||
|
def `must serialize and deserialize typed actor refs `(): Unit = {
|
||||||
|
|
||||||
|
val ref = (adaptedSystem ? Create(Actor.empty[Unit], "some-actor")).futureValue
|
||||||
|
|
||||||
|
val serialization = SerializationExtension(ActorSystemAdapter.toUntyped(adaptedSystem))
|
||||||
|
|
||||||
|
val serializer = serialization.findSerializerFor(ref) match {
|
||||||
|
case s: SerializerWithStringManifest ⇒ s
|
||||||
|
}
|
||||||
|
|
||||||
|
val manifest = serializer.manifest(ref)
|
||||||
|
val serialized = serializer.toBinary(ref)
|
||||||
|
|
||||||
|
val result = serializer.fromBinary(serialized, manifest)
|
||||||
|
|
||||||
|
result should ===(ref)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -33,7 +33,7 @@ object ClusterReceptionistSpec {
|
||||||
akka.actor {
|
akka.actor {
|
||||||
provider = cluster
|
provider = cluster
|
||||||
serialize-messages = off
|
serialize-messages = off
|
||||||
allow-java-serialization = off
|
allow-java-serialization = true
|
||||||
serializers {
|
serializers {
|
||||||
test = "akka.typed.cluster.receptionist.ClusterReceptionistSpec$PingSerializer"
|
test = "akka.typed.cluster.receptionist.ClusterReceptionistSpec$PingSerializer"
|
||||||
}
|
}
|
||||||
|
|
@ -41,8 +41,8 @@ object ClusterReceptionistSpec {
|
||||||
"akka.typed.cluster.receptionist.ClusterReceptionistSpec$Ping" = test
|
"akka.typed.cluster.receptionist.ClusterReceptionistSpec$Ping" = test
|
||||||
"akka.typed.cluster.receptionist.ClusterReceptionistSpec$Pong$" = test
|
"akka.typed.cluster.receptionist.ClusterReceptionistSpec$Pong$" = test
|
||||||
"akka.typed.cluster.receptionist.ClusterReceptionistSpec$Perish$" = test
|
"akka.typed.cluster.receptionist.ClusterReceptionistSpec$Perish$" = test
|
||||||
"akka.typed.internal.receptionist.ReceptionistImpl$DefaultServiceKey" = test
|
# for now, using Java serializers is good enough (tm), see #23687
|
||||||
"akka.typed.internal.adapter.ActorRefAdapter" = test
|
# "akka.typed.internal.receptionist.ReceptionistImpl$DefaultServiceKey" = test
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
akka.remote.artery.enabled = true
|
akka.remote.artery.enabled = true
|
||||||
|
|
@ -75,24 +75,18 @@ object ClusterReceptionistSpec {
|
||||||
case _: Ping ⇒ "a"
|
case _: Ping ⇒ "a"
|
||||||
case Pong ⇒ "b"
|
case Pong ⇒ "b"
|
||||||
case Perish ⇒ "c"
|
case Perish ⇒ "c"
|
||||||
case ReceptionistImpl.DefaultServiceKey(id) ⇒ "d"
|
|
||||||
case a: ActorRefAdapter[_] ⇒ "e"
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def toBinary(o: AnyRef): Array[Byte] = o match {
|
def toBinary(o: AnyRef): Array[Byte] = o match {
|
||||||
case p: Ping ⇒ ActorRefResolver(system.toTyped).toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8)
|
case p: Ping ⇒ ActorRefResolver(system.toTyped).toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8)
|
||||||
case Pong ⇒ Array.emptyByteArray
|
case Pong ⇒ Array.emptyByteArray
|
||||||
case Perish ⇒ Array.emptyByteArray
|
case Perish ⇒ Array.emptyByteArray
|
||||||
case ReceptionistImpl.DefaultServiceKey(id) ⇒ id.getBytes(StandardCharsets.UTF_8)
|
|
||||||
case a: ActorRefAdapter[_] ⇒ ActorRefResolver(system.toTyped).toSerializationFormat(a).getBytes(StandardCharsets.UTF_8)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
|
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
|
||||||
case "a" ⇒ Ping(ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8)))
|
case "a" ⇒ Ping(ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8)))
|
||||||
case "b" ⇒ Pong
|
case "b" ⇒ Pong
|
||||||
case "c" ⇒ Perish
|
case "c" ⇒ Perish
|
||||||
case "d" ⇒ ReceptionistImpl.DefaultServiceKey[Any](new String(bytes, StandardCharsets.UTF_8))
|
|
||||||
case "e" ⇒ ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -28,3 +28,17 @@ akka.typed {
|
||||||
#
|
#
|
||||||
library-extensions = ${?akka.library-extensions} []
|
library-extensions = ${?akka.library-extensions} []
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# TODO: move these out somewhere else when doing #23632
|
||||||
|
akka.actor {
|
||||||
|
serializers {
|
||||||
|
typed-misc = "akka.typed.cluster.internal.MiscMessageSerializer"
|
||||||
|
}
|
||||||
|
serialization-identifiers {
|
||||||
|
"akka.typed.cluster.internal.MiscMessageSerializer" = 24
|
||||||
|
}
|
||||||
|
serialization-bindings {
|
||||||
|
"akka.typed.ActorRef" = typed-misc
|
||||||
|
"akka.typed.internal.adapter.ActorRefAdapter" = typed-misc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,11 +3,16 @@
|
||||||
*/
|
*/
|
||||||
package akka.typed.cluster
|
package akka.typed.cluster
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets
|
||||||
|
|
||||||
import akka.typed.ActorSystem
|
import akka.typed.ActorSystem
|
||||||
import akka.typed.Extension
|
import akka.typed.Extension
|
||||||
import akka.typed.ExtensionId
|
import akka.typed.ExtensionId
|
||||||
import akka.typed.ActorRef
|
import akka.typed.ActorRef
|
||||||
|
import akka.typed.scaladsl.adapter._
|
||||||
import akka.actor.ExtendedActorSystem
|
import akka.actor.ExtendedActorSystem
|
||||||
|
import akka.annotation.InternalApi
|
||||||
|
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
|
||||||
|
|
||||||
object ActorRefResolver extends ExtensionId[ActorRefResolver] {
|
object ActorRefResolver extends ExtensionId[ActorRefResolver] {
|
||||||
def get(system: ActorSystem[_]): ActorRefResolver = apply(system)
|
def get(system: ActorSystem[_]): ActorRefResolver = apply(system)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,32 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.typed.cluster.internal
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets
|
||||||
|
|
||||||
|
import akka.annotation.InternalApi
|
||||||
|
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
|
||||||
|
import akka.typed.ActorRef
|
||||||
|
import akka.typed.cluster.ActorRefResolver
|
||||||
|
import akka.typed.internal.adapter.ActorRefAdapter
|
||||||
|
import akka.typed.scaladsl.adapter._
|
||||||
|
|
||||||
|
@InternalApi
|
||||||
|
class MiscMessageSerializer(val system: akka.actor.ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer {
|
||||||
|
|
||||||
|
private val resolver = ActorRefResolver(system.toTyped)
|
||||||
|
|
||||||
|
def manifest(o: AnyRef) = o match {
|
||||||
|
case ref: ActorRef[_] ⇒ "a"
|
||||||
|
}
|
||||||
|
|
||||||
|
def toBinary(o: AnyRef) = o match {
|
||||||
|
case ref: ActorRef[_] ⇒ resolver.toSerializationFormat(ref).getBytes(StandardCharsets.UTF_8)
|
||||||
|
}
|
||||||
|
|
||||||
|
def fromBinary(bytes: Array[Byte], manifest: String) = manifest match {
|
||||||
|
case "a" ⇒ resolver.resolveActorRef(new String(bytes, StandardCharsets.UTF_8))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue