Embed akka serialization inside jackson serialization (#29455)

* Embed akka serialization inside jackson serialization

Allows re-use of existing serializers when embedding the types in
Jackson

* Fix false dead code warning

* Move marker trait and add docs

* Remove module and document adding annotations

* Review feedback
This commit is contained in:
Christopher Batey 2020-08-05 16:06:33 +01:00
parent 7348939ff4
commit 67eb74f076
8 changed files with 175 additions and 6 deletions

View file

@ -390,6 +390,20 @@ For the `jackson-cbor` and custom bindings other than `jackson-json` compression
but can be enabled in the same way as the configuration shown above but replacing `jackson-json` with
the binding name (for example `jackson-cbor`).
## Using Akka Serialization for embedded types
For types that already have an Akka Serializer defined that are embedded in types serialized with Jackson the @apidoc[AkkaSerializationSerializer] and
@apidoc[AkkaSerializationDeserializer] can be used to Akka Serialization for individual fields.
The serializer/deserializer are not enabled automatically. The `@JsonSerialize` and `@JsonDeserialize` annotation needs to be added
to the fields containing the types to be serialized with Akka Serialization.
The type will be embedded as an object with the fields:
* serId - the serializer id
* serManifest - the manifest for the type
* payload - base64 encoded bytes
## Additional configuration
### Configuration per binding

View file

@ -134,7 +134,13 @@ The factory returns a `Behavior` that can be spawned like any other behavior.
### Conflict free replicated data types
TODO example once CRDTs are in
The following CRDTs are included that can be used to build your own data model:
* @apidoc[LwwTime]
* @apidoc[Counter]
* @apidoc[akka.persistence.typed.crdt.ORSet]
Akka serializers are included for all these types and can be used to serialize when @ref[embedded in Jackson](../serialization-jackson.md#using-akka-serialization-for-embedded-types).
### Last writer wins

View file

@ -0,0 +1,48 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.jackson
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit, SerializationTestKit }
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.crdt.{ Counter, LwwTime, ORSet }
import akka.persistence.typed.jackson.ReplicatedEventSourcingJacksonSpec.{ WithCounter, WithLwwTime, WithOrSet }
import akka.serialization.jackson.{ AkkaSerializationDeserializer, AkkaSerializationSerializer, JsonSerializable }
import com.fasterxml.jackson.databind.annotation.{ JsonDeserialize, JsonSerialize }
import org.scalatest.wordspec.AnyWordSpecLike
object ReplicatedEventSourcingJacksonSpec {
final case class WithLwwTime(lwwTime: LwwTime) extends JsonSerializable
final case class WithOrSet(
@JsonDeserialize(using = classOf[AkkaSerializationDeserializer])
@JsonSerialize(using = classOf[AkkaSerializationSerializer])
orSet: ORSet[String])
extends JsonSerializable
final case class WithCounter(
@JsonDeserialize(using = classOf[AkkaSerializationDeserializer])
@JsonSerialize(using = classOf[AkkaSerializationSerializer])
counter: Counter)
extends JsonSerializable
}
class ReplicatedEventSourcingJacksonSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {
private val serializationTestkit = new SerializationTestKit(system)
"RES jackson" should {
"serialize LwwTime" in {
val obj = WithLwwTime(LwwTime(5, ReplicaId("A")))
serializationTestkit.verifySerialization(obj)
}
"serialize ORSet" in {
val emptyOrSet = WithOrSet(ORSet.empty[String](ReplicaId("A")))
serializationTestkit.verifySerialization(emptyOrSet)
}
"serialize Counter" in {
val counter = WithCounter(Counter.empty)
serializationTestkit.verifySerialization(counter)
}
}
}

View file

@ -12,7 +12,6 @@ akka.serialization.jackson {
jackson-modules += "akka.serialization.jackson.AkkaJacksonModule"
# AkkaTypedJacksonModule optionally included if akka-actor-typed is in classpath
jackson-modules += "akka.serialization.jackson.AkkaTypedJacksonModule"
// FIXME how does that optional loading work??
# AkkaStreamsModule optionally included if akka-streams is in classpath
jackson-modules += "akka.serialization.jackson.AkkaStreamJacksonModule"
jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule"

View file

@ -0,0 +1,42 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.serialization.jackson
import com.fasterxml.jackson.core.{ JsonGenerator, JsonParser, ObjectCodec }
import com.fasterxml.jackson.databind.{ DeserializationContext, JsonNode, SerializerProvider }
import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer
import com.fasterxml.jackson.databind.ser.std.StdScalarSerializer
import akka.serialization.{ SerializationExtension, Serializer, Serializers }
final class AkkaSerializationSerializer extends StdScalarSerializer[AnyRef](classOf[AnyRef]) with ActorSystemAccess {
def serialization = SerializationExtension(currentSystem())
override def serialize(value: AnyRef, jgen: JsonGenerator, provider: SerializerProvider): Unit = {
val serializer: Serializer = serialization.findSerializerFor(value)
val serId = serializer.identifier
val manifest = Serializers.manifestFor(serializer, value)
val serialized = serializer.toBinary(value)
jgen.writeStartObject()
jgen.writeStringField("serId", serId.toString)
jgen.writeStringField("serManifest", manifest)
jgen.writeBinaryField("payload", serialized)
jgen.writeEndObject()
}
}
final class AkkaSerializationDeserializer
extends StdScalarDeserializer[AnyRef](classOf[AnyRef])
with ActorSystemAccess {
def serialization = SerializationExtension(currentSystem())
def deserialize(jp: JsonParser, ctxt: DeserializationContext): AnyRef = {
val codec: ObjectCodec = jp.getCodec()
val jsonNode = codec.readTree[JsonNode](jp)
val id = jsonNode.get("serId").textValue().toInt
val manifest = jsonNode.get("serManifest").textValue()
val payload = jsonNode.get("payload").binaryValue()
serialization.deserialize(payload, id, manifest).get
}
}

View file

@ -2,6 +2,7 @@ akka {
actor {
serialization-bindings {
"akka.serialization.jackson.CborSerializable" = jackson-cbor
"akka.serialization.jackson.JsonSerializable" = jackson-json
}
}
}

View file

@ -18,7 +18,6 @@ import java.util.logging.FileHandler
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.core.JsonFactory
@ -44,7 +43,6 @@ import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
@ -55,8 +53,11 @@ import akka.actor.setup.ActorSystemSetup
import akka.actor.typed.scaladsl.Behaviors
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import akka.testkit.TestActors
import akka.testkit.TestKit
import akka.serialization.SerializerWithStringManifest
import akka.testkit.{ TestActors, TestKit }
import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.annotation.JsonSerialize
object ScalaTestMessages {
trait TestMessage
@ -115,6 +116,42 @@ object ScalaTestMessages {
extends TestMessage
// #jackson-scala-enumeration
//delegate to AkkaSerialization
object HasAkkaSerializer {
def apply(description: String): HasAkkaSerializer = new HasAkkaSerializer(description)
}
// make sure jackson would fail
class HasAkkaSerializer private (@JsonIgnore val description: String) {
override def toString: String = s"InnerSerialization($description)"
def canEqual(other: Any): Boolean = other.isInstanceOf[HasAkkaSerializer]
override def equals(other: Any): Boolean = other match {
case that: HasAkkaSerializer =>
(that.canEqual(this)) &&
description == that.description
case _ => false
}
override def hashCode(): Int = {
val state = Seq(description)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
class InnerSerializationSerializer extends SerializerWithStringManifest {
override def identifier: Int = 123451
override def manifest(o: AnyRef): String = "M"
override def toBinary(o: AnyRef): Array[Byte] = o.asInstanceOf[HasAkkaSerializer].description.getBytes()
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = HasAkkaSerializer(new String(bytes))
}
final case class WithAkkaSerializer(
@JsonDeserialize(using = classOf[AkkaSerializationDeserializer])
@JsonSerialize(using = classOf[AkkaSerializationSerializer])
akkaSerializer: HasAkkaSerializer)
extends TestMessage
}
class ScalaTestEventMigration extends JacksonMigration {
@ -639,6 +676,14 @@ abstract class JacksonSerializerSpec(serializerName: String)
}
}
akka.serialization.jackson.allowed-class-prefix = ["akka.serialization.jackson.ScalaTestMessages$$OldCommand"]
akka.actor {
serializers {
inner-serializer = "akka.serialization.jackson.ScalaTestMessages$$InnerSerializationSerializer"
}
serialization-bindings {
"akka.serialization.jackson.ScalaTestMessages$$HasAkkaSerializer" = "inner-serializer"
}
}
""")))
with AnyWordSpecLike
with Matchers
@ -962,6 +1007,10 @@ abstract class JacksonSerializerSpec(serializerName: String)
}
}
"delegate to akka serialization" in {
checkSerialization(WithAkkaSerializer(HasAkkaSerializer("cat")))
}
}
}

View file

@ -0,0 +1,10 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.serialization.jackson
/**
* Marker trait for serialization with Jackson JSON in tests
*/
trait JsonSerializable