Merge pull request #27054 from akka/wip-24155-typed-actorref-patriknw

Jackson serializer typed.ActorRef, #24155
This commit is contained in:
Patrik Nordwall 2019-06-07 13:23:09 +02:00 committed by GitHub
commit 397b8792f9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 149 additions and 18 deletions

View file

@ -12,6 +12,8 @@ akka.serialization.jackson {
# It is also possible to use jackson-modules = ["*"] to dynamically # It is also possible to use jackson-modules = ["*"] to dynamically
# find and register all modules in the classpath. # find and register all modules in the classpath.
jackson-modules += "akka.serialization.jackson.AkkaJacksonModule" jackson-modules += "akka.serialization.jackson.AkkaJacksonModule"
# AkkaTypedJacksonModule optionally included if akka-actor-typed is in classpath
jackson-modules += "akka.serialization.jackson.AkkaTypedJacksonModule"
jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule" jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule"
jackson-modules += "com.fasterxml.jackson.datatype.jdk8.Jdk8Module" jackson-modules += "com.fasterxml.jackson.datatype.jdk8.Jdk8Module"
jackson-modules += "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule" jackson-modules += "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule"

View file

@ -16,8 +16,6 @@ import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer
import com.fasterxml.jackson.databind.ser.std.StdScalarSerializer import com.fasterxml.jackson.databind.ser.std.StdScalarSerializer
// FIXME add serializer for Typed ActorRef also (probably have to be in akka-cluster-typed module)
/** /**
* INTERNAL API: Adds support for serializing and deserializing [[ActorRef]]. * INTERNAL API: Adds support for serializing and deserializing [[ActorRef]].
*/ */

View file

@ -12,3 +12,9 @@ class AkkaJacksonModule extends JacksonModule with ActorRefModule with AddressMo
} }
object AkkaJacksonModule extends AkkaJacksonModule object AkkaJacksonModule extends AkkaJacksonModule
class AkkaTypedJacksonModule extends JacksonModule with TypedActorRefModule {
override def getModuleName = "AkkaTypedJacksonModule"
}
object AkkaTypedJacksonModule extends AkkaJacksonModule

View file

@ -87,17 +87,20 @@ object JacksonObjectMapperProvider extends ExtensionId[JacksonObjectMapperProvid
ObjectMapper.findModules(dynamicAccess.classLoader).asScala ObjectMapper.findModules(dynamicAccess.classLoader).asScala
else else
configuredModules.flatMap { fqcn configuredModules.flatMap { fqcn
dynamicAccess.createInstanceFor[Module](fqcn, Nil) match { if (isModuleEnabled(fqcn, dynamicAccess)) {
case Success(m) Some(m) dynamicAccess.createInstanceFor[Module](fqcn, Nil) match {
case Failure(e) case Success(m) Some(m)
log.foreach( case Failure(e)
_.error( log.foreach(
e, _.error(
s"Could not load configured Jackson module [$fqcn], " + e,
"please verify classpath dependencies or amend the configuration " + s"Could not load configured Jackson module [$fqcn], " +
"[akka.serialization.jackson-modules]. Continuing without this module.")) "please verify classpath dependencies or amend the configuration " +
None "[akka.serialization.jackson-modules]. Continuing without this module."))
} None
}
} else
None
} }
val modules2 = modules1.map { module val modules2 = modules1.map { module
@ -119,6 +122,19 @@ object JacksonObjectMapperProvider extends ExtensionId[JacksonObjectMapperProvid
mapper mapper
} }
private def isModuleEnabled(fqcn: String, dynamicAccess: DynamicAccess): Boolean = {
// akka-actor-typed dependency is "provided" and may not be included
if (fqcn == "akka.serialization.jackson.AkkaTypedJacksonModule") {
dynamicAccess.getClassFor("akka.actor.typed.ActorRef") match {
case Failure(_: ClassNotFoundException | _: NoClassDefFoundError) =>
false // akka-actor-typed not in classpath
case _ =>
true
}
} else
true
}
private def features(config: Config, section: String): immutable.Seq[(String, Boolean)] = { private def features(config: Config, section: String): immutable.Seq[(String, Boolean)] = {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
val cfg = config.getConfig(section) val cfg = config.getConfig(section)

View file

@ -0,0 +1,66 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.serialization.jackson
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorRefResolver
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.JsonTokenId
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer
import com.fasterxml.jackson.databind.ser.std.StdScalarSerializer
/**
* INTERNAL API: Adds support for serializing and deserializing [[akka.actor.typed.ActorRef]].
*/
@InternalApi private[akka] trait TypedActorRefModule extends JacksonModule {
addSerializer(classOf[ActorRef[_]], () => TypedActorRefSerializer.instance, () => TypedActorRefDeserializer.instance)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object TypedActorRefSerializer {
val instance: TypedActorRefSerializer = new TypedActorRefSerializer
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class TypedActorRefSerializer
extends StdScalarSerializer[ActorRef[_]](classOf[ActorRef[_]])
with ActorSystemAccess {
override def serialize(value: ActorRef[_], jgen: JsonGenerator, provider: SerializerProvider): Unit = {
val serializedActorRef = ActorRefResolver(currentSystem().toTyped).toSerializationFormat(value)
jgen.writeString(serializedActorRef)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object TypedActorRefDeserializer {
val instance: TypedActorRefDeserializer = new TypedActorRefDeserializer
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class TypedActorRefDeserializer
extends StdScalarDeserializer[ActorRef[_]](classOf[ActorRef[_]])
with ActorSystemAccess {
def deserialize(jp: JsonParser, ctxt: DeserializationContext): ActorRef[_] = {
if (jp.currentTokenId() == JsonTokenId.ID_STRING) {
val serializedActorRef = jp.getText()
ActorRefResolver(currentSystem().toTyped).resolveActorRef(serializedActorRef)
} else
ctxt.handleUnexpectedToken(handledType(), jp).asInstanceOf[ActorRef[_]]
}
}

View file

@ -218,14 +218,42 @@ public interface JavaTestMessages {
CommandWithActorRef that = (CommandWithActorRef) o; CommandWithActorRef that = (CommandWithActorRef) o;
if (name != null ? !name.equals(that.name) : that.name != null) return false; if (!name.equals(that.name)) return false;
return replyTo != null ? replyTo.equals(that.replyTo) : that.replyTo == null; return replyTo.equals(that.replyTo);
} }
@Override @Override
public int hashCode() { public int hashCode() {
int result = name != null ? name.hashCode() : 0; int result = name.hashCode();
result = 31 * result + (replyTo != null ? replyTo.hashCode() : 0); result = 31 * result + replyTo.hashCode();
return result;
}
}
public class CommandWithTypedActorRef implements TestMessage {
public final String name;
public final akka.actor.typed.ActorRef<String> replyTo;
public CommandWithTypedActorRef(String name, akka.actor.typed.ActorRef<String> replyTo) {
this.name = name;
this.replyTo = replyTo;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CommandWithTypedActorRef that = (CommandWithTypedActorRef) o;
if (!name.equals(that.name)) return false;
return replyTo.equals(that.replyTo);
}
@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + replyTo.hashCode();
return result; return result;
} }
} }

View file

@ -23,6 +23,7 @@ import akka.actor.BootstrapSetup
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.actor.Status import akka.actor.Status
import akka.actor.setup.ActorSystemSetup import akka.actor.setup.ActorSystemSetup
import akka.actor.typed.scaladsl.Behaviors
import akka.serialization.Serialization import akka.serialization.Serialization
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import akka.testkit.TestActors import akka.testkit.TestActors
@ -53,6 +54,8 @@ object ScalaTestMessages {
final case class TimeCommand(timestamp: LocalDateTime, duration: FiniteDuration) extends TestMessage final case class TimeCommand(timestamp: LocalDateTime, duration: FiniteDuration) extends TestMessage
final case class CollectionsCommand(strings: List[String], objects: Vector[SimpleCommand]) extends TestMessage final case class CollectionsCommand(strings: List[String], objects: Vector[SimpleCommand]) extends TestMessage
final case class CommandWithActorRef(name: String, replyTo: ActorRef) extends TestMessage final case class CommandWithActorRef(name: String, replyTo: ActorRef) extends TestMessage
final case class CommandWithTypedActorRef(name: String, replyTo: akka.actor.typed.ActorRef[String])
extends TestMessage
final case class CommandWithAddress(name: String, address: Address) extends TestMessage final case class CommandWithAddress(name: String, address: Address) extends TestMessage
final case class Event1(field1: String) extends TestMessage final case class Event1(field1: String) extends TestMessage
@ -365,6 +368,12 @@ abstract class JacksonSerializerSpec(serializerName: String)
checkSerialization(new CommandWithActorRef("echo", echo)) checkSerialization(new CommandWithActorRef("echo", echo))
} }
"serialize with typed.ActorRef" in {
import akka.actor.typed.scaladsl.adapter._
val ref = system.spawnAnonymous(Behaviors.empty[String])
checkSerialization(new CommandWithTypedActorRef("echo", ref))
}
"serialize with Address" in { "serialize with Address" in {
val address = Address("akka", "sys", "localhost", 2552) val address = Address("akka", "sys", "localhost", 2552)
checkSerialization(new CommandWithAddress("echo", address)) checkSerialization(new CommandWithAddress("echo", address))
@ -450,6 +459,12 @@ abstract class JacksonSerializerSpec(serializerName: String)
checkSerialization(CommandWithActorRef("echo", echo)) checkSerialization(CommandWithActorRef("echo", echo))
} }
"serialize with typed.ActorRef" in {
import akka.actor.typed.scaladsl.adapter._
val ref = system.spawnAnonymous(Behaviors.empty[String])
checkSerialization(CommandWithTypedActorRef("echo", ref))
}
"serialize with Address" in { "serialize with Address" in {
val address = Address("akka", "sys", "localhost", 2552) val address = Address("akka", "sys", "localhost", 2552)
checkSerialization(CommandWithAddress("echo", address)) checkSerialization(CommandWithAddress("echo", address))

View file

@ -242,7 +242,7 @@ lazy val docs = akkaModule("akka-docs")
.disablePlugins(ScalafixPlugin) .disablePlugins(ScalafixPlugin)
lazy val jackson = akkaModule("akka-serialization-jackson") lazy val jackson = akkaModule("akka-serialization-jackson")
.dependsOn(actor, actorTests % "test->test", testkit % "test->test") .dependsOn(actor, actorTyped % "optional->compile", actorTests % "test->test", testkit % "test->test")
.settings(Dependencies.jackson) .settings(Dependencies.jackson)
.settings(AutomaticModuleName.settings("akka.serialization.jackson")) .settings(AutomaticModuleName.settings("akka.serialization.jackson"))
.settings(OSGi.jackson) .settings(OSGi.jackson)