serializer for akka.actor.typed.ActorRef
* most convenient for users to include it akka-serialization-jackson and load it when akka-actor-typed is in classpath * provided dependency to akka-actor-typed
This commit is contained in:
parent
6122966fca
commit
93017d05c7
8 changed files with 149 additions and 18 deletions
|
|
@ -12,6 +12,8 @@ akka.serialization.jackson {
|
||||||
# It is also possible to use jackson-modules = ["*"] to dynamically
|
# It is also possible to use jackson-modules = ["*"] to dynamically
|
||||||
# find and register all modules in the classpath.
|
# find and register all modules in the classpath.
|
||||||
jackson-modules += "akka.serialization.jackson.AkkaJacksonModule"
|
jackson-modules += "akka.serialization.jackson.AkkaJacksonModule"
|
||||||
|
# AkkaTypedJacksonModule optionally included if akka-actor-typed is in classpath
|
||||||
|
jackson-modules += "akka.serialization.jackson.AkkaTypedJacksonModule"
|
||||||
jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule"
|
jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule"
|
||||||
jackson-modules += "com.fasterxml.jackson.datatype.jdk8.Jdk8Module"
|
jackson-modules += "com.fasterxml.jackson.datatype.jdk8.Jdk8Module"
|
||||||
jackson-modules += "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule"
|
jackson-modules += "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule"
|
||||||
|
|
|
||||||
|
|
@ -16,8 +16,6 @@ import com.fasterxml.jackson.databind.SerializerProvider
|
||||||
import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer
|
import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer
|
||||||
import com.fasterxml.jackson.databind.ser.std.StdScalarSerializer
|
import com.fasterxml.jackson.databind.ser.std.StdScalarSerializer
|
||||||
|
|
||||||
// FIXME add serializer for Typed ActorRef also (probably have to be in akka-cluster-typed module)
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API: Adds support for serializing and deserializing [[ActorRef]].
|
* INTERNAL API: Adds support for serializing and deserializing [[ActorRef]].
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -12,3 +12,9 @@ class AkkaJacksonModule extends JacksonModule with ActorRefModule with AddressMo
|
||||||
}
|
}
|
||||||
|
|
||||||
object AkkaJacksonModule extends AkkaJacksonModule
|
object AkkaJacksonModule extends AkkaJacksonModule
|
||||||
|
|
||||||
|
class AkkaTypedJacksonModule extends JacksonModule with TypedActorRefModule {
|
||||||
|
override def getModuleName = "AkkaTypedJacksonModule"
|
||||||
|
}
|
||||||
|
|
||||||
|
object AkkaTypedJacksonModule extends AkkaJacksonModule
|
||||||
|
|
|
||||||
|
|
@ -87,17 +87,20 @@ object JacksonObjectMapperProvider extends ExtensionId[JacksonObjectMapperProvid
|
||||||
ObjectMapper.findModules(dynamicAccess.classLoader).asScala
|
ObjectMapper.findModules(dynamicAccess.classLoader).asScala
|
||||||
else
|
else
|
||||||
configuredModules.flatMap { fqcn ⇒
|
configuredModules.flatMap { fqcn ⇒
|
||||||
dynamicAccess.createInstanceFor[Module](fqcn, Nil) match {
|
if (isModuleEnabled(fqcn, dynamicAccess)) {
|
||||||
case Success(m) ⇒ Some(m)
|
dynamicAccess.createInstanceFor[Module](fqcn, Nil) match {
|
||||||
case Failure(e) ⇒
|
case Success(m) ⇒ Some(m)
|
||||||
log.foreach(
|
case Failure(e) ⇒
|
||||||
_.error(
|
log.foreach(
|
||||||
e,
|
_.error(
|
||||||
s"Could not load configured Jackson module [$fqcn], " +
|
e,
|
||||||
"please verify classpath dependencies or amend the configuration " +
|
s"Could not load configured Jackson module [$fqcn], " +
|
||||||
"[akka.serialization.jackson-modules]. Continuing without this module."))
|
"please verify classpath dependencies or amend the configuration " +
|
||||||
None
|
"[akka.serialization.jackson-modules]. Continuing without this module."))
|
||||||
}
|
None
|
||||||
|
}
|
||||||
|
} else
|
||||||
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
val modules2 = modules1.map { module ⇒
|
val modules2 = modules1.map { module ⇒
|
||||||
|
|
@ -119,6 +122,19 @@ object JacksonObjectMapperProvider extends ExtensionId[JacksonObjectMapperProvid
|
||||||
mapper
|
mapper
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def isModuleEnabled(fqcn: String, dynamicAccess: DynamicAccess): Boolean = {
|
||||||
|
// akka-actor-typed dependency is "provided" and may not be included
|
||||||
|
if (fqcn == "akka.serialization.jackson.AkkaTypedJacksonModule") {
|
||||||
|
dynamicAccess.getClassFor("akka.actor.typed.ActorRef") match {
|
||||||
|
case Failure(_: ClassNotFoundException | _: NoClassDefFoundError) =>
|
||||||
|
false // akka-actor-typed not in classpath
|
||||||
|
case _ =>
|
||||||
|
true
|
||||||
|
}
|
||||||
|
} else
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
private def features(config: Config, section: String): immutable.Seq[(String, Boolean)] = {
|
private def features(config: Config, section: String): immutable.Seq[(String, Boolean)] = {
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
val cfg = config.getConfig(section)
|
val cfg = config.getConfig(section)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,66 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.serialization.jackson
|
||||||
|
|
||||||
|
import akka.actor.typed.ActorRef
|
||||||
|
import akka.actor.typed.ActorRefResolver
|
||||||
|
import akka.actor.typed.scaladsl.adapter._
|
||||||
|
import akka.annotation.InternalApi
|
||||||
|
import com.fasterxml.jackson.core.JsonGenerator
|
||||||
|
import com.fasterxml.jackson.core.JsonParser
|
||||||
|
import com.fasterxml.jackson.core.JsonTokenId
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationContext
|
||||||
|
import com.fasterxml.jackson.databind.SerializerProvider
|
||||||
|
import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer
|
||||||
|
import com.fasterxml.jackson.databind.ser.std.StdScalarSerializer
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API: Adds support for serializing and deserializing [[akka.actor.typed.ActorRef]].
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] trait TypedActorRefModule extends JacksonModule {
|
||||||
|
addSerializer(classOf[ActorRef[_]], () => TypedActorRefSerializer.instance, () => TypedActorRefDeserializer.instance)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] object TypedActorRefSerializer {
|
||||||
|
val instance: TypedActorRefSerializer = new TypedActorRefSerializer
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] class TypedActorRefSerializer
|
||||||
|
extends StdScalarSerializer[ActorRef[_]](classOf[ActorRef[_]])
|
||||||
|
with ActorSystemAccess {
|
||||||
|
override def serialize(value: ActorRef[_], jgen: JsonGenerator, provider: SerializerProvider): Unit = {
|
||||||
|
val serializedActorRef = ActorRefResolver(currentSystem().toTyped).toSerializationFormat(value)
|
||||||
|
jgen.writeString(serializedActorRef)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] object TypedActorRefDeserializer {
|
||||||
|
val instance: TypedActorRefDeserializer = new TypedActorRefDeserializer
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] class TypedActorRefDeserializer
|
||||||
|
extends StdScalarDeserializer[ActorRef[_]](classOf[ActorRef[_]])
|
||||||
|
with ActorSystemAccess {
|
||||||
|
|
||||||
|
def deserialize(jp: JsonParser, ctxt: DeserializationContext): ActorRef[_] = {
|
||||||
|
if (jp.currentTokenId() == JsonTokenId.ID_STRING) {
|
||||||
|
val serializedActorRef = jp.getText()
|
||||||
|
ActorRefResolver(currentSystem().toTyped).resolveActorRef(serializedActorRef)
|
||||||
|
} else
|
||||||
|
ctxt.handleUnexpectedToken(handledType(), jp).asInstanceOf[ActorRef[_]]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -218,14 +218,42 @@ public interface JavaTestMessages {
|
||||||
|
|
||||||
CommandWithActorRef that = (CommandWithActorRef) o;
|
CommandWithActorRef that = (CommandWithActorRef) o;
|
||||||
|
|
||||||
if (name != null ? !name.equals(that.name) : that.name != null) return false;
|
if (!name.equals(that.name)) return false;
|
||||||
return replyTo != null ? replyTo.equals(that.replyTo) : that.replyTo == null;
|
return replyTo.equals(that.replyTo);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
int result = name != null ? name.hashCode() : 0;
|
int result = name.hashCode();
|
||||||
result = 31 * result + (replyTo != null ? replyTo.hashCode() : 0);
|
result = 31 * result + replyTo.hashCode();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class CommandWithTypedActorRef implements TestMessage {
|
||||||
|
public final String name;
|
||||||
|
public final akka.actor.typed.ActorRef<String> replyTo;
|
||||||
|
|
||||||
|
public CommandWithTypedActorRef(String name, akka.actor.typed.ActorRef<String> replyTo) {
|
||||||
|
this.name = name;
|
||||||
|
this.replyTo = replyTo;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
|
||||||
|
CommandWithTypedActorRef that = (CommandWithTypedActorRef) o;
|
||||||
|
|
||||||
|
if (!name.equals(that.name)) return false;
|
||||||
|
return replyTo.equals(that.replyTo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
int result = name.hashCode();
|
||||||
|
result = 31 * result + replyTo.hashCode();
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ import akka.actor.BootstrapSetup
|
||||||
import akka.actor.ExtendedActorSystem
|
import akka.actor.ExtendedActorSystem
|
||||||
import akka.actor.Status
|
import akka.actor.Status
|
||||||
import akka.actor.setup.ActorSystemSetup
|
import akka.actor.setup.ActorSystemSetup
|
||||||
|
import akka.actor.typed.scaladsl.Behaviors
|
||||||
import akka.serialization.Serialization
|
import akka.serialization.Serialization
|
||||||
import akka.serialization.SerializationExtension
|
import akka.serialization.SerializationExtension
|
||||||
import akka.testkit.TestActors
|
import akka.testkit.TestActors
|
||||||
|
|
@ -53,6 +54,8 @@ object ScalaTestMessages {
|
||||||
final case class TimeCommand(timestamp: LocalDateTime, duration: FiniteDuration) extends TestMessage
|
final case class TimeCommand(timestamp: LocalDateTime, duration: FiniteDuration) extends TestMessage
|
||||||
final case class CollectionsCommand(strings: List[String], objects: Vector[SimpleCommand]) extends TestMessage
|
final case class CollectionsCommand(strings: List[String], objects: Vector[SimpleCommand]) extends TestMessage
|
||||||
final case class CommandWithActorRef(name: String, replyTo: ActorRef) extends TestMessage
|
final case class CommandWithActorRef(name: String, replyTo: ActorRef) extends TestMessage
|
||||||
|
final case class CommandWithTypedActorRef(name: String, replyTo: akka.actor.typed.ActorRef[String])
|
||||||
|
extends TestMessage
|
||||||
final case class CommandWithAddress(name: String, address: Address) extends TestMessage
|
final case class CommandWithAddress(name: String, address: Address) extends TestMessage
|
||||||
|
|
||||||
final case class Event1(field1: String) extends TestMessage
|
final case class Event1(field1: String) extends TestMessage
|
||||||
|
|
@ -365,6 +368,12 @@ abstract class JacksonSerializerSpec(serializerName: String)
|
||||||
checkSerialization(new CommandWithActorRef("echo", echo))
|
checkSerialization(new CommandWithActorRef("echo", echo))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"serialize with typed.ActorRef" in {
|
||||||
|
import akka.actor.typed.scaladsl.adapter._
|
||||||
|
val ref = system.spawnAnonymous(Behaviors.empty[String])
|
||||||
|
checkSerialization(new CommandWithTypedActorRef("echo", ref))
|
||||||
|
}
|
||||||
|
|
||||||
"serialize with Address" in {
|
"serialize with Address" in {
|
||||||
val address = Address("akka", "sys", "localhost", 2552)
|
val address = Address("akka", "sys", "localhost", 2552)
|
||||||
checkSerialization(new CommandWithAddress("echo", address))
|
checkSerialization(new CommandWithAddress("echo", address))
|
||||||
|
|
@ -450,6 +459,12 @@ abstract class JacksonSerializerSpec(serializerName: String)
|
||||||
checkSerialization(CommandWithActorRef("echo", echo))
|
checkSerialization(CommandWithActorRef("echo", echo))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"serialize with typed.ActorRef" in {
|
||||||
|
import akka.actor.typed.scaladsl.adapter._
|
||||||
|
val ref = system.spawnAnonymous(Behaviors.empty[String])
|
||||||
|
checkSerialization(CommandWithTypedActorRef("echo", ref))
|
||||||
|
}
|
||||||
|
|
||||||
"serialize with Address" in {
|
"serialize with Address" in {
|
||||||
val address = Address("akka", "sys", "localhost", 2552)
|
val address = Address("akka", "sys", "localhost", 2552)
|
||||||
checkSerialization(CommandWithAddress("echo", address))
|
checkSerialization(CommandWithAddress("echo", address))
|
||||||
|
|
|
||||||
|
|
@ -239,7 +239,7 @@ lazy val docs = akkaModule("akka-docs")
|
||||||
.disablePlugins(ScalafixPlugin)
|
.disablePlugins(ScalafixPlugin)
|
||||||
|
|
||||||
lazy val jackson = akkaModule("akka-serialization-jackson")
|
lazy val jackson = akkaModule("akka-serialization-jackson")
|
||||||
.dependsOn(actor, actorTests % "test->test", testkit % "test->test")
|
.dependsOn(actor, actorTyped % "provided->compile", actorTests % "test->test", testkit % "test->test")
|
||||||
.settings(Dependencies.jackson)
|
.settings(Dependencies.jackson)
|
||||||
.settings(AutomaticModuleName.settings("akka.serialization.jackson"))
|
.settings(AutomaticModuleName.settings("akka.serialization.jackson"))
|
||||||
.settings(OSGi.jackson)
|
.settings(OSGi.jackson)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue