From 67eb74f07659b17cfea572b57c71c39750c4190d Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Wed, 5 Aug 2020 16:06:33 +0100 Subject: [PATCH] Embed akka serialization inside jackson serialization (#29455) * Embed akka serialization inside jackson serialization Allows re-use of existing serializers when embedding the types in Jackson * Fix false dead code warning * Move marker trait and add docs * Remove module and document adding annotations * Review feedback --- .../src/main/paradox/serialization-jackson.md | 14 +++++ .../paradox/typed/replicated-eventsourcing.md | 8 ++- .../ReplicatedEventSourcingJacksonSpec.scala | 48 ++++++++++++++++ .../src/main/resources/reference.conf | 1 - .../jackson/AkkaSerializationModule.scala | 42 ++++++++++++++ .../src/test/resources/reference.conf | 1 + .../jackson/JacksonSerializerSpec.scala | 57 +++++++++++++++++-- .../jackson/JsonSerializable.scala | 10 ++++ 8 files changed, 175 insertions(+), 6 deletions(-) create mode 100644 akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/jackson/ReplicatedEventSourcingJacksonSpec.scala create mode 100644 akka-serialization-jackson/src/main/scala/akka/serialization/jackson/AkkaSerializationModule.scala create mode 100644 akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JsonSerializable.scala diff --git a/akka-docs/src/main/paradox/serialization-jackson.md b/akka-docs/src/main/paradox/serialization-jackson.md index 83298aa788..c983cd4145 100644 --- a/akka-docs/src/main/paradox/serialization-jackson.md +++ b/akka-docs/src/main/paradox/serialization-jackson.md @@ -390,6 +390,20 @@ For the `jackson-cbor` and custom bindings other than `jackson-json` compression but can be enabled in the same way as the configuration shown above but replacing `jackson-json` with the binding name (for example `jackson-cbor`). +## Using Akka Serialization for embedded types + +For types that already have an Akka Serializer defined that are embedded in types serialized with Jackson the @apidoc[AkkaSerializationSerializer] and +@apidoc[AkkaSerializationDeserializer] can be used to Akka Serialization for individual fields. + +The serializer/deserializer are not enabled automatically. The `@JsonSerialize` and `@JsonDeserialize` annotation needs to be added +to the fields containing the types to be serialized with Akka Serialization. + +The type will be embedded as an object with the fields: + +* serId - the serializer id +* serManifest - the manifest for the type +* payload - base64 encoded bytes + ## Additional configuration ### Configuration per binding diff --git a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md index f5c1df80b8..eb4905d35d 100644 --- a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md +++ b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md @@ -134,7 +134,13 @@ The factory returns a `Behavior` that can be spawned like any other behavior. ### Conflict free replicated data types -TODO example once CRDTs are in +The following CRDTs are included that can be used to build your own data model: + +* @apidoc[LwwTime] +* @apidoc[Counter] +* @apidoc[akka.persistence.typed.crdt.ORSet] + +Akka serializers are included for all these types and can be used to serialize when @ref[embedded in Jackson](../serialization-jackson.md#using-akka-serialization-for-embedded-types). ### Last writer wins diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/jackson/ReplicatedEventSourcingJacksonSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/jackson/ReplicatedEventSourcingJacksonSpec.scala new file mode 100644 index 0000000000..df992e26c3 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/jackson/ReplicatedEventSourcingJacksonSpec.scala @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed.jackson + +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit, SerializationTestKit } +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.crdt.{ Counter, LwwTime, ORSet } +import akka.persistence.typed.jackson.ReplicatedEventSourcingJacksonSpec.{ WithCounter, WithLwwTime, WithOrSet } +import akka.serialization.jackson.{ AkkaSerializationDeserializer, AkkaSerializationSerializer, JsonSerializable } +import com.fasterxml.jackson.databind.annotation.{ JsonDeserialize, JsonSerialize } +import org.scalatest.wordspec.AnyWordSpecLike + +object ReplicatedEventSourcingJacksonSpec { + final case class WithLwwTime(lwwTime: LwwTime) extends JsonSerializable + final case class WithOrSet( + @JsonDeserialize(using = classOf[AkkaSerializationDeserializer]) + @JsonSerialize(using = classOf[AkkaSerializationSerializer]) + orSet: ORSet[String]) + extends JsonSerializable + final case class WithCounter( + @JsonDeserialize(using = classOf[AkkaSerializationDeserializer]) + @JsonSerialize(using = classOf[AkkaSerializationSerializer]) + counter: Counter) + extends JsonSerializable + +} + +class ReplicatedEventSourcingJacksonSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { + + private val serializationTestkit = new SerializationTestKit(system) + + "RES jackson" should { + "serialize LwwTime" in { + val obj = WithLwwTime(LwwTime(5, ReplicaId("A"))) + serializationTestkit.verifySerialization(obj) + } + "serialize ORSet" in { + val emptyOrSet = WithOrSet(ORSet.empty[String](ReplicaId("A"))) + serializationTestkit.verifySerialization(emptyOrSet) + } + "serialize Counter" in { + val counter = WithCounter(Counter.empty) + serializationTestkit.verifySerialization(counter) + } + } +} diff --git a/akka-serialization-jackson/src/main/resources/reference.conf b/akka-serialization-jackson/src/main/resources/reference.conf index d06d1fce3d..fd041f9012 100644 --- a/akka-serialization-jackson/src/main/resources/reference.conf +++ b/akka-serialization-jackson/src/main/resources/reference.conf @@ -12,7 +12,6 @@ akka.serialization.jackson { jackson-modules += "akka.serialization.jackson.AkkaJacksonModule" # AkkaTypedJacksonModule optionally included if akka-actor-typed is in classpath jackson-modules += "akka.serialization.jackson.AkkaTypedJacksonModule" - // FIXME how does that optional loading work?? # AkkaStreamsModule optionally included if akka-streams is in classpath jackson-modules += "akka.serialization.jackson.AkkaStreamJacksonModule" jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule" diff --git a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/AkkaSerializationModule.scala b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/AkkaSerializationModule.scala new file mode 100644 index 0000000000..406a07f62e --- /dev/null +++ b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/AkkaSerializationModule.scala @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.serialization.jackson + +import com.fasterxml.jackson.core.{ JsonGenerator, JsonParser, ObjectCodec } +import com.fasterxml.jackson.databind.{ DeserializationContext, JsonNode, SerializerProvider } +import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer +import com.fasterxml.jackson.databind.ser.std.StdScalarSerializer +import akka.serialization.{ SerializationExtension, Serializer, Serializers } + +final class AkkaSerializationSerializer extends StdScalarSerializer[AnyRef](classOf[AnyRef]) with ActorSystemAccess { + def serialization = SerializationExtension(currentSystem()) + override def serialize(value: AnyRef, jgen: JsonGenerator, provider: SerializerProvider): Unit = { + val serializer: Serializer = serialization.findSerializerFor(value) + val serId = serializer.identifier + val manifest = Serializers.manifestFor(serializer, value) + val serialized = serializer.toBinary(value) + jgen.writeStartObject() + jgen.writeStringField("serId", serId.toString) + jgen.writeStringField("serManifest", manifest) + jgen.writeBinaryField("payload", serialized) + jgen.writeEndObject() + } +} + +final class AkkaSerializationDeserializer + extends StdScalarDeserializer[AnyRef](classOf[AnyRef]) + with ActorSystemAccess { + + def serialization = SerializationExtension(currentSystem()) + + def deserialize(jp: JsonParser, ctxt: DeserializationContext): AnyRef = { + val codec: ObjectCodec = jp.getCodec() + val jsonNode = codec.readTree[JsonNode](jp) + val id = jsonNode.get("serId").textValue().toInt + val manifest = jsonNode.get("serManifest").textValue() + val payload = jsonNode.get("payload").binaryValue() + serialization.deserialize(payload, id, manifest).get + } +} diff --git a/akka-serialization-jackson/src/test/resources/reference.conf b/akka-serialization-jackson/src/test/resources/reference.conf index d75292e3c0..668c907f19 100644 --- a/akka-serialization-jackson/src/test/resources/reference.conf +++ b/akka-serialization-jackson/src/test/resources/reference.conf @@ -2,6 +2,7 @@ akka { actor { serialization-bindings { "akka.serialization.jackson.CborSerializable" = jackson-cbor + "akka.serialization.jackson.JsonSerializable" = jackson-json } } } diff --git a/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala index c82bf11a45..715f31bbcc 100644 --- a/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala +++ b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala @@ -18,7 +18,6 @@ import java.util.logging.FileHandler import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration - import com.fasterxml.jackson.annotation.JsonSubTypes import com.fasterxml.jackson.annotation.JsonTypeInfo import com.fasterxml.jackson.core.JsonFactory @@ -44,7 +43,6 @@ import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike - import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.Address @@ -55,8 +53,11 @@ import akka.actor.setup.ActorSystemSetup import akka.actor.typed.scaladsl.Behaviors import akka.serialization.Serialization import akka.serialization.SerializationExtension -import akka.testkit.TestActors -import akka.testkit.TestKit +import akka.serialization.SerializerWithStringManifest +import akka.testkit.{ TestActors, TestKit } +import com.fasterxml.jackson.annotation.JsonIgnore +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.fasterxml.jackson.databind.annotation.JsonSerialize object ScalaTestMessages { trait TestMessage @@ -115,6 +116,42 @@ object ScalaTestMessages { extends TestMessage // #jackson-scala-enumeration + //delegate to AkkaSerialization + object HasAkkaSerializer { + def apply(description: String): HasAkkaSerializer = new HasAkkaSerializer(description) + } + // make sure jackson would fail + class HasAkkaSerializer private (@JsonIgnore val description: String) { + + override def toString: String = s"InnerSerialization($description)" + + def canEqual(other: Any): Boolean = other.isInstanceOf[HasAkkaSerializer] + + override def equals(other: Any): Boolean = other match { + case that: HasAkkaSerializer => + (that.canEqual(this)) && + description == that.description + case _ => false + } + + override def hashCode(): Int = { + val state = Seq(description) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } + } + + class InnerSerializationSerializer extends SerializerWithStringManifest { + override def identifier: Int = 123451 + override def manifest(o: AnyRef): String = "M" + override def toBinary(o: AnyRef): Array[Byte] = o.asInstanceOf[HasAkkaSerializer].description.getBytes() + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = HasAkkaSerializer(new String(bytes)) + } + + final case class WithAkkaSerializer( + @JsonDeserialize(using = classOf[AkkaSerializationDeserializer]) + @JsonSerialize(using = classOf[AkkaSerializationSerializer]) + akkaSerializer: HasAkkaSerializer) + extends TestMessage } class ScalaTestEventMigration extends JacksonMigration { @@ -639,6 +676,14 @@ abstract class JacksonSerializerSpec(serializerName: String) } } akka.serialization.jackson.allowed-class-prefix = ["akka.serialization.jackson.ScalaTestMessages$$OldCommand"] + akka.actor { + serializers { + inner-serializer = "akka.serialization.jackson.ScalaTestMessages$$InnerSerializationSerializer" + } + serialization-bindings { + "akka.serialization.jackson.ScalaTestMessages$$HasAkkaSerializer" = "inner-serializer" + } + } """))) with AnyWordSpecLike with Matchers @@ -962,6 +1007,10 @@ abstract class JacksonSerializerSpec(serializerName: String) } } + "delegate to akka serialization" in { + checkSerialization(WithAkkaSerializer(HasAkkaSerializer("cat"))) + } + } } diff --git a/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JsonSerializable.scala b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JsonSerializable.scala new file mode 100644 index 0000000000..d0ec9e82b1 --- /dev/null +++ b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JsonSerializable.scala @@ -0,0 +1,10 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.serialization.jackson + +/** + * Marker trait for serialization with Jackson JSON in tests + */ +trait JsonSerializable