diff --git a/akka-docs/src/main/paradox/persistence-schema-evolution.md b/akka-docs/src/main/paradox/persistence-schema-evolution.md index c3627ca796..5cfcfa22d2 100644 --- a/akka-docs/src/main/paradox/persistence-schema-evolution.md +++ b/akka-docs/src/main/paradox/persistence-schema-evolution.md @@ -224,7 +224,7 @@ needs to have an associated code which indicates if it is a window or aisle seat Adding fields is the most common change you'll need to apply to your messages so make sure the serialization format you picked for your payloads can handle it apropriately, i.e. such changes should be *binary compatible*. This is achieved using the right serializer toolkit. In the following examples we will be using protobuf. -See also @ref:[how to add fields with Jackson](serialization-jackson.md#add-field). +See also @ref:[how to add fields with Jackson](serialization-jackson.md#add-optional-field). While being able to read messages with missing fields is half of the solution, you also need to deal with the missing values somehow. This is usually modeled as some kind of default value, or by representing the field as an @scala[`Option[T]`]@java[`Optional`] diff --git a/akka-docs/src/main/paradox/serialization-jackson.md b/akka-docs/src/main/paradox/serialization-jackson.md index 83298aa788..1870b5bf30 100644 --- a/akka-docs/src/main/paradox/serialization-jackson.md +++ b/akka-docs/src/main/paradox/serialization-jackson.md @@ -205,7 +205,7 @@ We will look at a few scenarios of how the classes may be evolved. Removing a field can be done without any migration code. The Jackson serializer will ignore properties that does not exist in the class. -### Add Field +### Add Optional Field Adding an optional field can be done without any migration code. The default value will be @scala[None]@java[`Optional.empty`]. @@ -226,6 +226,8 @@ Scala Java : @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v2a/ItemAdded.java) { #add-optional } +### Add Mandatory Field + Let's say we want to have a mandatory `discount` property without default value instead: Scala @@ -361,6 +363,63 @@ binding, but it should still be possible to deserialize old data with Jackson. It's a list of class names or prefixes of class names. +## Rolling updates + +When doing a rolling update, for a period of time there are two different binaries running in production. If the schema +has evolved requiring a new schema version, the data serialized by the new binary will be unreadable from the old +binary. This situation causes transient errors on the processes running the old binary. This service degradation is +usually fine since the rolling update will eventually complete and all old processes will be replaced with the new +binary. To avoid this service degradation you can also use forward-one support in your schema evolutions. + +To complete a no-degradation rolling update, you need to make two deployments. First, deploy a new binary which can read +the new schema but still uses the old schema. Then, deploy a second binary which serializes data using the new schema +and drops the downcasting code from the migration. + +Let's take, for example, the case above where we [renamed a field](#rename-field). + +The starting schema is: + +Scala +: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala) { #add-optional } + +Java +: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java) { #add-optional } + +In a first deployment, we still don't make any change to the event class: + +Scala +: @@snip [ItemAdded.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala) { #forward-one-rename } + +Java +: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java) { #forward-one-rename } + +but we introduce a migration that can read the newer schema which is versioned `2`: + +Scala +: @@snip [ItemAddedMigration.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1withv2/ItemAddedMigration.scala) { #forward-one-rename } + +Java +: @@snip [ItemAddedMigration.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1withv2/ItemAddedMigration.java) { #forward-one-rename } + +Once all running nodes have the new migration code which can read version `2` of `ItemAdded` we can proceed with the +second step. So, we deploy the updated event: + +Scala +: @@snip [ItemAdded.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v2c/ItemAdded.scala) { #rename } + +Java +: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v2c/ItemAdded.java) { #rename } + +and the final migration code which no longer needs forward-compatibility code: + +Scala +: @@snip [ItemAddedMigration.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v2c/ItemAddedMigration.scala) { #rename } + +Java +: @@snip [ItemAddedMigration.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v2c/ItemAddedMigration.java) { #rename } + + + ## Jackson Modules The following Jackson modules are enabled by default: diff --git a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonMigration.scala b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonMigration.scala index c39b2a17e0..58f61173cc 100644 --- a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonMigration.scala +++ b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonMigration.scala @@ -22,11 +22,22 @@ import akka.util.unused abstract class JacksonMigration { /** - * Define current version. The first version, when no migration was used, - * is always 1. + * Define current version, that is, the value used when serializing new data. The first version, when no + * migration was used, is always 1. */ def currentVersion: Int + /** + * Define the supported forward version this migration can read (must be greater or equal than `currentVersion`). + * If this value is different from [[currentVersion]] a [[JacksonMigration]] may be required to downcast + * the received payload to the current schema. + */ + def supportedForwardVersion: Int = currentVersion + + require( + currentVersion <= supportedForwardVersion, + s"""The "currentVersion" [$currentVersion] of a JacksonMigration must be less or equal to the "supportedForwardVersion" [$supportedForwardVersion].""") + /** * Override this method if you have changed the class name. Return * current class name. diff --git a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala index c3f5084205..8c1cf9f4c1 100644 --- a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala +++ b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala @@ -319,11 +319,16 @@ import akka.util.OptionVal val className = migration match { case Some(transformer) if fromVersion < transformer.currentVersion => transformer.transformClassName(fromVersion, manifestClassName) - case Some(transformer) if fromVersion > transformer.currentVersion => + case Some(transformer) if fromVersion == transformer.currentVersion => + manifestClassName + case Some(transformer) if fromVersion <= transformer.supportedForwardVersion => + transformer.transformClassName(fromVersion, manifestClassName) + case Some(transformer) if fromVersion > transformer.supportedForwardVersion => throw new IllegalStateException( - s"Migration version ${transformer.currentVersion} is " + + s"Migration version ${transformer.supportedForwardVersion} is " + s"behind version $fromVersion of deserialized type [$manifestClassName]") - case _ => manifestClassName + case None => + manifestClassName } if (typeInManifest && (className ne manifestClassName)) @@ -359,7 +364,13 @@ import akka.util.OptionVal val jsonTree = objectMapper.readTree(decompressedBytes) val newJsonTree = transformer.transform(fromVersion, jsonTree) objectMapper.treeToValue(newJsonTree, clazz) - case _ => + case Some(transformer) if fromVersion == transformer.currentVersion => + objectMapper.readValue(decompressedBytes, clazz) + case Some(transformer) if fromVersion <= transformer.supportedForwardVersion => + val jsonTree = objectMapper.readTree(decompressedBytes) + val newJsonTree = transformer.transform(fromVersion, jsonTree) + objectMapper.treeToValue(newJsonTree, clazz) + case None => objectMapper.readValue(decompressedBytes, clazz) } diff --git a/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigration.java b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV2.java similarity index 82% rename from akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigration.java rename to akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV2.java index c66716b54b..0a0fa12cb4 100644 --- a/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigration.java +++ b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV2.java @@ -9,15 +9,16 @@ import com.fasterxml.jackson.databind.node.IntNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.JsonNode; -public class JavaTestEventMigration extends JacksonMigration { +public class JavaTestEventMigrationV2 extends JacksonMigration { @Override public int currentVersion() { - return 3; + return 2; } @Override public String transformClassName(int fromVersion, String className) { + // Ignore the incoming manifest and produce the same class name always. return JavaTestMessages.Event2.class.getName(); } diff --git a/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV2WithV3.java b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV2WithV3.java new file mode 100644 index 0000000000..b2ef413583 --- /dev/null +++ b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV2WithV3.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + */ + +package akka.serialization.jackson; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class JavaTestEventMigrationV2WithV3 extends JacksonMigration { + + @Override + public int currentVersion() { + return 2; + } + + @Override + public int supportedForwardVersion() { + return 3; + } + + @Override + public String transformClassName(int fromVersion, String className) { + // Always produce the type of the currentVersion. When fromVersion is lower, + // transform will lift it. When fromVersion is higher, transform will downcast it. + return JavaTestMessages.Event2.class.getName(); + } + + @Override + public JsonNode transform(int fromVersion, JsonNode json) { + ObjectNode root = (ObjectNode) json; + if (fromVersion < 2) { + root = upcastV1ToV2((ObjectNode) json); + } + if (fromVersion == 3) { + root = downcastV3ToV2((ObjectNode) json); + } + return root; + } + + private ObjectNode upcastV1ToV2(ObjectNode json) { + ObjectNode root = json; + root.set("field1V2", root.get("field1")); + root.remove("field1"); + root.set("field2", IntNode.valueOf(17)); + return root; + } + + private ObjectNode downcastV3ToV2(ObjectNode json) { + ObjectNode root = json; + root.set("field2", root.get("field3")); + root.remove("field3"); + return root; + } +} diff --git a/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV3.java b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV3.java new file mode 100644 index 0000000000..09fbea9b74 --- /dev/null +++ b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV3.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + */ + +package akka.serialization.jackson; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class JavaTestEventMigrationV3 extends JacksonMigration { + + @Override + public int currentVersion() { + return 3; + } + + @Override + public String transformClassName(int fromVersion, String className) { + // Always produce the type of the currentVersion. When fromVersion is lower, + // transform will lift it. when fromVersion is higher, transform will adapt it. + return JavaTestMessages.Event3.class.getName(); + } + + @Override + public JsonNode transform(int fromVersion, JsonNode json) { + ObjectNode root = (ObjectNode) json; + if (fromVersion < 2) { + root = upcastV1ToV2(root); + } + if (fromVersion < 3) { + root = upcastV2ToV3(root); + } + return root; + } + + private ObjectNode upcastV1ToV2(ObjectNode root) { + root.set("field1V2", root.get("field1")); + root.remove("field1"); + root.set("field2", IntNode.valueOf(17)); + return root; + } + + private ObjectNode upcastV2ToV3(ObjectNode root) { + root.set("field3", root.get("field2")); + root.remove("field2"); + return root; + } +} diff --git a/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestMessages.java b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestMessages.java index 4a5e0e674b..3bc178a6cc 100644 --- a/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestMessages.java +++ b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestMessages.java @@ -386,6 +386,42 @@ public interface JavaTestMessages { } } + public class Event3 implements TestMessage { + private final String field1V2; // same as in Event2 + private final int field3; // renamed field (was field2) + + public Event3(String field1V2, int field3) { + this.field1V2 = field1V2; + this.field3 = field3; + } + + public String getField1V2() { + return field1V2; + } + + public int getField3() { + return field3; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Event3 event3 = (Event3) o; + + if (field3 != event3.field3) return false; + return field1V2 != null ? field1V2.equals(event3.field1V2) : event3.field1V2 == null; + } + + @Override + public int hashCode() { + int result = field1V2 != null ? field1V2.hashCode() : 0; + result = 31 * result + field3; + return result; + } + } + public class Zoo implements TestMessage { public final Animal first; diff --git a/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java b/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java index 6a520ade67..1cc1e1aa79 100644 --- a/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java +++ b/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java @@ -7,6 +7,7 @@ package jdoc.akka.serialization.jackson.v1; import jdoc.akka.serialization.jackson.MySerializable; // #add-optional +// #forward-one-rename public class ItemAdded implements MySerializable { public final String shoppingCartId; public final String productId; @@ -18,4 +19,5 @@ public class ItemAdded implements MySerializable { this.quantity = quantity; } } +// #forward-one-rename // #add-optional diff --git a/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1withv2/ItemAddedMigration.java b/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1withv2/ItemAddedMigration.java new file mode 100644 index 0000000000..dbdb847505 --- /dev/null +++ b/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1withv2/ItemAddedMigration.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package jdoc.akka.serialization.jackson.v1withv2; + +// #forward-one-rename + +import akka.serialization.jackson.JacksonMigration; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class ItemAddedMigration extends JacksonMigration { + + // Data produced in this node is still produced using the version 1 of the schema + @Override + public int currentVersion() { + return 1; + } + + @Override + public int supportedForwardVersion() { + return 2; + } + + @Override + public JsonNode transform(int fromVersion, JsonNode json) { + ObjectNode root = (ObjectNode) json; + if (fromVersion == 2) { + // When receiving an event of version 2 we down-cast it to the version 1 of the schema + root.set("productId", root.get("itemId")); + root.remove("itemId"); + } + return root; + } +} +// #forward-one-rename diff --git a/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala index c82bf11a45..29229a0f8b 100644 --- a/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala +++ b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala @@ -4,6 +4,7 @@ package akka.serialization.jackson +import java.lang import java.nio.charset.StandardCharsets import java.time.Duration import java.time.Instant @@ -13,12 +14,16 @@ import java.util.Arrays import java.util.Locale import java.util.Optional import java.util.UUID + import java.util.logging.FileHandler import scala.collection.immutable import scala.concurrent.duration._ -import scala.concurrent.duration.FiniteDuration - +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.BootstrapSetup +import akka.actor.ExtendedActorSystem import com.fasterxml.jackson.annotation.JsonSubTypes import com.fasterxml.jackson.annotation.JsonTypeInfo import com.fasterxml.jackson.core.JsonFactory @@ -28,15 +33,12 @@ import com.fasterxml.jackson.core.StreamReadFeature import com.fasterxml.jackson.core.StreamWriteFeature import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.DeserializationFeature -import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.MapperFeature import com.fasterxml.jackson.databind.Module import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.databind.exc.InvalidTypeIdException import com.fasterxml.jackson.databind.json.JsonMapper -import com.fasterxml.jackson.databind.node.IntNode -import com.fasterxml.jackson.databind.node.ObjectNode import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.scala.JsonScalaEnumeration import com.github.ghik.silencer.silent @@ -45,11 +47,7 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.actor.Address -import akka.actor.BootstrapSetup -import akka.actor.ExtendedActorSystem +import scala.concurrent.duration.FiniteDuration import akka.actor.Status import akka.actor.setup.ActorSystemSetup import akka.actor.typed.scaladsl.Behaviors @@ -85,6 +83,7 @@ object ScalaTestMessages { final case class Event1(field1: String) extends TestMessage final case class Event2(field1V2: String, field2: Int) extends TestMessage + final case class Event3(field1V2: String, field3: Int) extends TestMessage final case class Zoo(first: Animal) extends TestMessage @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @@ -117,21 +116,6 @@ object ScalaTestMessages { } -class ScalaTestEventMigration extends JacksonMigration { - override def currentVersion = 3 - - override def transformClassName(fromVersion: Int, className: String): String = - classOf[ScalaTestMessages.Event2].getName - - override def transform(fromVersion: Int, json: JsonNode): JsonNode = { - val root = json.asInstanceOf[ObjectNode] - root.set[JsonNode]("field1V2", root.get("field1")) - root.remove("field1") - root.set[JsonNode]("field2", IntNode.valueOf(17)) - root - } -} - class JacksonCborSerializerSpec extends JacksonSerializerSpec("jackson-cbor") { "have compression disabled by default" in { val conf = JacksonObjectMapperProvider.configForBinding("jackson-cbor", system.settings.config) @@ -621,17 +605,8 @@ class JacksonJsonSerializerSpec extends JacksonSerializerSpec("jackson-json") { } } -abstract class JacksonSerializerSpec(serializerName: String) - extends TestKit( - ActorSystem( - "JacksonJsonSerializerSpec", - ConfigFactory.parseString(s""" - akka.serialization.jackson.migrations { - "akka.serialization.jackson.JavaTestMessages$$Event1" = "akka.serialization.jackson.JavaTestEventMigration" - "akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigration" - "akka.serialization.jackson.ScalaTestMessages$$Event1" = "akka.serialization.jackson.ScalaTestEventMigration" - "akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigration" - } +object JacksonSerializerSpec { + def baseConfig(serializerName: String): String = s""" akka.actor { serialization-bindings { "akka.serialization.jackson.ScalaTestMessages$$TestMessage" = $serializerName @@ -639,7 +614,14 @@ abstract class JacksonSerializerSpec(serializerName: String) } } akka.serialization.jackson.allowed-class-prefix = ["akka.serialization.jackson.ScalaTestMessages$$OldCommand"] - """))) + """ +} + +abstract class JacksonSerializerSpec(serializerName: String) + extends TestKit( + ActorSystem( + "JacksonJsonSerializerSpec", + ConfigFactory.parseString(JacksonSerializerSpec.baseConfig(serializerName)))) with AnyWordSpecLike with Matchers with BeforeAndAfterAll { @@ -772,22 +754,138 @@ abstract class JacksonSerializerSpec(serializerName: String) } } - "deserialize with migrations" in { + // TODO: Consider moving the migrations Specs to a separate Spec + "deserialize with migrations" in withSystem(s""" + akka.serialization.jackson.migrations { + ## Usually the key is a FQCN but we're hacking the name to use multiple migrations for the + ## same type in a single test. + "deserialize-Java.Event1-into-Java.Event3" = "akka.serialization.jackson.JavaTestEventMigrationV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sys => val event1 = new Event1("a") - val serializer = serializerFor(event1) + val serializer = serializerFor(event1, sys) val blob = serializer.toBinary(event1) - val event2 = serializer.fromBinary(blob, classOf[Event1].getName).asInstanceOf[Event2] - event1.getField1 should ===(event2.getField1V2) - event2.getField2 should ===(17) + + // Event1 has no migration configured so it uses the default manifest name (with no version) + serializer.manifest(event1) should ===(classOf[Event1].getName) + + // Hack the manifest to enforce the use a particular migration when deserializing the blob of Event1 + val event3 = serializer.fromBinary(blob, "deserialize-Java.Event1-into-Java.Event3").asInstanceOf[Event3] + event1.getField1 should ===(event3.getField1V2) + event3.getField3 should ===(17) } "deserialize with migrations from V2" in { + // produce a blob/manifest from an ActorSystem without migrations val event1 = new Event1("a") val serializer = serializerFor(event1) val blob = serializer.toBinary(event1) - val event2 = serializer.fromBinary(blob, classOf[Event1].getName + "#2").asInstanceOf[Event2] - event1.getField1 should ===(event2.getField1V2) - event2.getField2 should ===(17) + val manifest = serializer.manifest(event1) + + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event1" = "akka.serialization.jackson.JavaTestEventMigrationV2" + "akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + // read the blob/manifest from an ActorSystem with migrations + val serializerV2: JacksonSerializer = serializerFor(event1, sysV2) + val event2 = serializerV2.fromBinary(blob, manifest).asInstanceOf[Event2] + event1.getField1 should ===(event2.getField1V2) + event2.getField2 should ===(17) + + // Event2 has a migration configured so it uses a manifest with a version + val serializerFor2 = serializerFor(event2, sysV2) + serializerFor2.manifest(event2) should ===(classOf[Event2].getName + "#2") + } + + } + + "use the migration's currentVersion on new serializations" in { + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + val event2 = new Event2("a", 17) + // Event2 has a migration configured so it uses a manifest with a version + val serializer2 = serializerFor(event2, sysV2) + serializer2.manifest(event2) should ===(classOf[Event2].getName + "#2") + } + } + + "use the migration's currentVersion on new serializations when supporting forward versions" in { + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2WithV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + val event2 = new Event2("a", 17) + // Event2 has a migration configured so it uses a manifest with a version + val serializer2 = serializerFor(event2, sysV2) + serializer2.manifest(event2) should ===(classOf[Event2].getName + "#2") + } + } + + "deserialize a V3 blob into a V2 class (forward-one support) and back" in { + + val blobV3 = + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event3" = "akka.serialization.jackson.JavaTestEventMigrationV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 => + val event3 = new Event3("Steve", 49) + val serializer = serializerFor(event3, sysV3) + val blob = serializer.toBinary(event3) + blob + } + + val blobV2 = + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2WithV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2WithV3 => + val serializerForEvent2 = + serialization(sysV2WithV3).serializerFor(classOf[Event2]).asInstanceOf[JacksonSerializer] + val event2 = serializerForEvent2.fromBinary(blobV3, classOf[Event2].getName + "#3").asInstanceOf[Event2] + event2.getField1V2 should ===("Steve") + event2.getField2 should ===(49) + serializerForEvent2.toBinary(event2) + } + + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event3" = "akka.serialization.jackson.JavaTestEventMigrationV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 => + val serializerForEvent3 = serialization(sysV3).serializerFor(classOf[Event3]).asInstanceOf[JacksonSerializer] + val event3 = serializerForEvent3.fromBinary(blobV2, classOf[Event3].getName + "#2").asInstanceOf[Event3] + event3.getField1V2 should ===("Steve") + event3.getField3 should ===(49) + } + } + + "deserialize unsupported versions throws an exception" in { + intercept[lang.IllegalStateException] { + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event1" = "akka.serialization.jackson.JavaTestEventMigrationV2" + "akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + // produce a blob/manifest from an ActorSystem without migrations + val event1 = new Event1("a") + val serializer = serializerFor(event1) + val blob = serializer.toBinary(event1) + val manifest = serializer.manifest(event1) + // Event1 has no migration configured so it uses the default manifest name (with no version) + val serializerV2: JacksonSerializer = serializerFor(event1, sysV2) + serializerV2.fromBinary(blob, manifest + "#9").asInstanceOf[Event2] + } + + } } } @@ -882,22 +980,125 @@ abstract class JacksonSerializerSpec(serializerName: String) } } - "deserialize with migrations" in { + // TODO: Consider moving the migrations Specs to a separate Spec + "deserialize with migrations" in withSystem(s""" + akka.serialization.jackson.migrations { + ## Usually the key is a FQCN but we're hacking the name to use multiple migrations for the + ## same type in a single test. + "deserialize-Event1-into-Event3" = "akka.serialization.jackson.ScalaTestEventMigrationV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sys => val event1 = Event1("a") - val serializer = serializerFor(event1) + val serializer = serializerFor(event1, sys) val blob = serializer.toBinary(event1) - val event2 = serializer.fromBinary(blob, classOf[Event1].getName).asInstanceOf[Event2] - event1.field1 should ===(event2.field1V2) - event2.field2 should ===(17) + + // Event1 has no migration configured so it uses the default manifest name (with no version) + serializer.manifest(event1) should ===(classOf[Event1].getName) + + // Hack the manifest to enforce the use a particular migration when deserializing the blob of Event1 + val event3 = serializer.fromBinary(blob, "deserialize-Event1-into-Event3").asInstanceOf[Event3] + event1.field1 should ===(event3.field1V2) + event3.field3 should ===(17) } "deserialize with migrations from V2" in { + // produce a blob/manifest from an ActorSystem without migrations val event1 = Event1("a") val serializer = serializerFor(event1) val blob = serializer.toBinary(event1) - val event2 = serializer.fromBinary(blob, classOf[Event1].getName + "#2").asInstanceOf[Event2] - event1.field1 should ===(event2.field1V2) - event2.field2 should ===(17) + val manifest = serializer.manifest(event1) + + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.ScalaTestMessages$$Event1" = "akka.serialization.jackson.ScalaTestEventMigrationV2" + "akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + // read the blob/manifest from an ActorSystem with migrations + val serializerV2: JacksonSerializer = serializerFor(event1, sysV2) + val event2 = serializerV2.fromBinary(blob, manifest).asInstanceOf[Event2] + event1.field1 should ===(event2.field1V2) + event2.field2 should ===(17) + + // Event2 has a migration configured so it uses a manifest with a version + val serializerFor2 = serializerFor(event2, sysV2) + serializerFor2.manifest(event2) should ===(classOf[Event2].getName + "#2") + } + + } + + "use the migration's currentVersion on new serializations" in { + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + val event2 = new Event2("a", 17) + // Event2 has a migration configured so it uses a manifest with a version + val serializer2 = serializerFor(event2, sysV2) + serializer2.manifest(event2) should ===(classOf[Event2].getName + "#2") + } + } + + "deserialize a V3 blob into a V2 class (forward-one support) and back" in { + + val blobV3 = + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.ScalaTestMessages$$Event3" = "akka.serialization.jackson.ScalaTestEventMigrationV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 => + val event3 = new Event3("Steve", 49) + val serializer = serializerFor(event3, sysV3) + val blob = serializer.toBinary(event3) + blob + } + + val blobV2 = + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2WithV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2WithV3 => + val serializerForEvent2 = + serialization(sysV2WithV3).serializerFor(classOf[Event2]).asInstanceOf[JacksonSerializer] + val event2 = serializerForEvent2.fromBinary(blobV3, classOf[Event2].getName + "#3").asInstanceOf[Event2] + event2.field1V2 should ===("Steve") + event2.field2 should ===(49) + serializerForEvent2.toBinary(event2) + } + + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.ScalaTestMessages$$Event3" = "akka.serialization.jackson.ScalaTestEventMigrationV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 => + val serializerForEvent3 = serialization(sysV3).serializerFor(classOf[Event3]).asInstanceOf[JacksonSerializer] + val event3 = serializerForEvent3.fromBinary(blobV2, classOf[Event3].getName + "#2").asInstanceOf[Event3] + event3.field1V2 should ===("Steve") + event3.field3 should ===(49) + } + } + + "deserialize unsupported versions throws an exception" in { + intercept[lang.IllegalStateException] { + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.ScalaTestMessages$$Event1" = "akka.serialization.jackson.ScalaTestEventMigrationV2" + "akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + // produce a blob/manifest from an ActorSystem without migrations + val event1 = new Event1("a") + val serializer = serializerFor(event1) + val blob = serializer.toBinary(event1) + val manifest = serializer.manifest(event1) + // Event1 has no migration configured so it uses the default manifest name (with no version) + val serializerV2: JacksonSerializer = serializerFor(event1, sysV2) + serializerV2.fromBinary(blob, manifest + "#9").asInstanceOf[Event2] + } + + } } "not allow serialization of deny listed class" in { diff --git a/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/ScalaTestEventMigration.scala b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/ScalaTestEventMigration.scala new file mode 100644 index 0000000000..b50dddb273 --- /dev/null +++ b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/ScalaTestEventMigration.scala @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.serialization.jackson + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.IntNode +import com.fasterxml.jackson.databind.node.ObjectNode + +object ScalaTestEventMigration { + def upcastV1ToV2(root: ObjectNode): ObjectNode = { + root.set[JsonNode]("field1V2", root.get("field1")) + root.remove("field1") + root.set[JsonNode]("field2", IntNode.valueOf(17)) + root + } + + def upcastV2ToV3(root: ObjectNode): ObjectNode = { + root.set("field3", root.get("field2")) + root.remove("field2") + root + } + + def downcastV3ToV2(root: ObjectNode) = { + // downcast the V3 representation to the V2 representation. A field + // is renamed. + root.set("field2", root.get("field3")) + root.remove("field3") + root + } + +} + +class ScalaTestEventMigrationV2 extends JacksonMigration { + import ScalaTestEventMigration._ + + override def currentVersion = 2 + + override def transformClassName(fromVersion: Int, className: String): String = + classOf[ScalaTestMessages.Event2].getName + + override def transform(fromVersion: Int, json: JsonNode): JsonNode = { + val root = json.asInstanceOf[ObjectNode] + upcastV1ToV2(root) + } + +} + +class ScalaTestEventMigrationV2WithV3 extends JacksonMigration { + import ScalaTestEventMigration._ + + override def currentVersion = 2 + + override def supportedForwardVersion: Int = 3 + + // Always produce the type of the currentVersion. When fromVersion is lower, + // transform will lift it. When fromVersion is higher, transform will downcast it. + override def transformClassName(fromVersion: Int, className: String): String = + classOf[ScalaTestMessages.Event2].getName + + override def transform(fromVersion: Int, json: JsonNode): JsonNode = { + var root = json.asInstanceOf[ObjectNode] + if (fromVersion < 2) { + root = upcastV1ToV2(root) + } + if (fromVersion == 3) { + root = downcastV3ToV2(root) + } + root + } + +} + +class ScalaTestEventMigrationV3 extends JacksonMigration { + import ScalaTestEventMigration._ + + override def currentVersion = 3 + + override def transformClassName(fromVersion: Int, className: String): String = + classOf[ScalaTestMessages.Event3].getName + + override def transform(fromVersion: Int, json: JsonNode): JsonNode = { + var root = json.asInstanceOf[ObjectNode] + if (fromVersion < 2) { + root = upcastV1ToV2(root) + } + if (fromVersion < 3) { + root = upcastV2ToV3(root) + } + root + } + +} diff --git a/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala b/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala index 2fa81b2360..86cc7134ef 100644 --- a/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala +++ b/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala @@ -7,5 +7,7 @@ package doc.akka.serialization.jackson.v1 import doc.akka.serialization.jackson.MySerializable // #add-optional +// #forward-one-rename case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int) extends MySerializable +// #forward-one-rename // #add-optional diff --git a/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1withv2/ItemAddedMigration.scala b/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1withv2/ItemAddedMigration.scala new file mode 100644 index 0000000000..a2aa09cc35 --- /dev/null +++ b/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1withv2/ItemAddedMigration.scala @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package doc.akka.serialization.jackson.v1withv2 + +// #forward-one-rename +import akka.serialization.jackson.JacksonMigration +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode + +class ItemAddedMigration extends JacksonMigration { + + // Data produced in this node is still produced using the version 1 of the schema + override def currentVersion: Int = 1 + + override def supportedForwardVersion: Int = 2 + + override def transform(fromVersion: Int, json: JsonNode): JsonNode = { + val root = json.asInstanceOf[ObjectNode] + if (fromVersion == 2) { + // When receiving an event of version 2 we down-cast it to the version 1 of the schema + root.set[JsonNode]("productId", root.get("itemId")) + root.remove("itemId") + } + root + } +} +// #forward-one-rename