Event migration improvements (#29514)
This commit is contained in:
parent
57fb9e9093
commit
728dda874e
14 changed files with 651 additions and 63 deletions
|
|
@ -224,7 +224,7 @@ needs to have an associated code which indicates if it is a window or aisle seat
|
|||
Adding fields is the most common change you'll need to apply to your messages so make sure the serialization format
|
||||
you picked for your payloads can handle it apropriately, i.e. such changes should be *binary compatible*.
|
||||
This is achieved using the right serializer toolkit. In the following examples we will be using protobuf.
|
||||
See also @ref:[how to add fields with Jackson](serialization-jackson.md#add-field).
|
||||
See also @ref:[how to add fields with Jackson](serialization-jackson.md#add-optional-field).
|
||||
|
||||
While being able to read messages with missing fields is half of the solution, you also need to deal with the missing
|
||||
values somehow. This is usually modeled as some kind of default value, or by representing the field as an @scala[`Option[T]`]@java[`Optional<T>`]
|
||||
|
|
|
|||
|
|
@ -205,7 +205,7 @@ We will look at a few scenarios of how the classes may be evolved.
|
|||
Removing a field can be done without any migration code. The Jackson serializer will ignore properties that does
|
||||
not exist in the class.
|
||||
|
||||
### Add Field
|
||||
### Add Optional Field
|
||||
|
||||
Adding an optional field can be done without any migration code. The default value will be @scala[None]@java[`Optional.empty`].
|
||||
|
||||
|
|
@ -226,6 +226,8 @@ Scala
|
|||
Java
|
||||
: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v2a/ItemAdded.java) { #add-optional }
|
||||
|
||||
### Add Mandatory Field
|
||||
|
||||
Let's say we want to have a mandatory `discount` property without default value instead:
|
||||
|
||||
Scala
|
||||
|
|
@ -361,6 +363,63 @@ binding, but it should still be possible to deserialize old data with Jackson.
|
|||
|
||||
It's a list of class names or prefixes of class names.
|
||||
|
||||
## Rolling updates
|
||||
|
||||
When doing a rolling update, for a period of time there are two different binaries running in production. If the schema
|
||||
has evolved requiring a new schema version, the data serialized by the new binary will be unreadable from the old
|
||||
binary. This situation causes transient errors on the processes running the old binary. This service degradation is
|
||||
usually fine since the rolling update will eventually complete and all old processes will be replaced with the new
|
||||
binary. To avoid this service degradation you can also use forward-one support in your schema evolutions.
|
||||
|
||||
To complete a no-degradation rolling update, you need to make two deployments. First, deploy a new binary which can read
|
||||
the new schema but still uses the old schema. Then, deploy a second binary which serializes data using the new schema
|
||||
and drops the downcasting code from the migration.
|
||||
|
||||
Let's take, for example, the case above where we [renamed a field](#rename-field).
|
||||
|
||||
The starting schema is:
|
||||
|
||||
Scala
|
||||
: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala) { #add-optional }
|
||||
|
||||
Java
|
||||
: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java) { #add-optional }
|
||||
|
||||
In a first deployment, we still don't make any change to the event class:
|
||||
|
||||
Scala
|
||||
: @@snip [ItemAdded.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala) { #forward-one-rename }
|
||||
|
||||
Java
|
||||
: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java) { #forward-one-rename }
|
||||
|
||||
but we introduce a migration that can read the newer schema which is versioned `2`:
|
||||
|
||||
Scala
|
||||
: @@snip [ItemAddedMigration.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1withv2/ItemAddedMigration.scala) { #forward-one-rename }
|
||||
|
||||
Java
|
||||
: @@snip [ItemAddedMigration.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1withv2/ItemAddedMigration.java) { #forward-one-rename }
|
||||
|
||||
Once all running nodes have the new migration code which can read version `2` of `ItemAdded` we can proceed with the
|
||||
second step. So, we deploy the updated event:
|
||||
|
||||
Scala
|
||||
: @@snip [ItemAdded.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v2c/ItemAdded.scala) { #rename }
|
||||
|
||||
Java
|
||||
: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v2c/ItemAdded.java) { #rename }
|
||||
|
||||
and the final migration code which no longer needs forward-compatibility code:
|
||||
|
||||
Scala
|
||||
: @@snip [ItemAddedMigration.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v2c/ItemAddedMigration.scala) { #rename }
|
||||
|
||||
Java
|
||||
: @@snip [ItemAddedMigration.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v2c/ItemAddedMigration.java) { #rename }
|
||||
|
||||
|
||||
|
||||
## Jackson Modules
|
||||
|
||||
The following Jackson modules are enabled by default:
|
||||
|
|
|
|||
|
|
@ -22,11 +22,22 @@ import akka.util.unused
|
|||
abstract class JacksonMigration {
|
||||
|
||||
/**
|
||||
* Define current version. The first version, when no migration was used,
|
||||
* is always 1.
|
||||
* Define current version, that is, the value used when serializing new data. The first version, when no
|
||||
* migration was used, is always 1.
|
||||
*/
|
||||
def currentVersion: Int
|
||||
|
||||
/**
|
||||
* Define the supported forward version this migration can read (must be greater or equal than `currentVersion`).
|
||||
* If this value is different from [[currentVersion]] a [[JacksonMigration]] may be required to downcast
|
||||
* the received payload to the current schema.
|
||||
*/
|
||||
def supportedForwardVersion: Int = currentVersion
|
||||
|
||||
require(
|
||||
currentVersion <= supportedForwardVersion,
|
||||
s"""The "currentVersion" [$currentVersion] of a JacksonMigration must be less or equal to the "supportedForwardVersion" [$supportedForwardVersion].""")
|
||||
|
||||
/**
|
||||
* Override this method if you have changed the class name. Return
|
||||
* current class name.
|
||||
|
|
|
|||
|
|
@ -319,11 +319,16 @@ import akka.util.OptionVal
|
|||
val className = migration match {
|
||||
case Some(transformer) if fromVersion < transformer.currentVersion =>
|
||||
transformer.transformClassName(fromVersion, manifestClassName)
|
||||
case Some(transformer) if fromVersion > transformer.currentVersion =>
|
||||
case Some(transformer) if fromVersion == transformer.currentVersion =>
|
||||
manifestClassName
|
||||
case Some(transformer) if fromVersion <= transformer.supportedForwardVersion =>
|
||||
transformer.transformClassName(fromVersion, manifestClassName)
|
||||
case Some(transformer) if fromVersion > transformer.supportedForwardVersion =>
|
||||
throw new IllegalStateException(
|
||||
s"Migration version ${transformer.currentVersion} is " +
|
||||
s"Migration version ${transformer.supportedForwardVersion} is " +
|
||||
s"behind version $fromVersion of deserialized type [$manifestClassName]")
|
||||
case _ => manifestClassName
|
||||
case None =>
|
||||
manifestClassName
|
||||
}
|
||||
|
||||
if (typeInManifest && (className ne manifestClassName))
|
||||
|
|
@ -359,7 +364,13 @@ import akka.util.OptionVal
|
|||
val jsonTree = objectMapper.readTree(decompressedBytes)
|
||||
val newJsonTree = transformer.transform(fromVersion, jsonTree)
|
||||
objectMapper.treeToValue(newJsonTree, clazz)
|
||||
case _ =>
|
||||
case Some(transformer) if fromVersion == transformer.currentVersion =>
|
||||
objectMapper.readValue(decompressedBytes, clazz)
|
||||
case Some(transformer) if fromVersion <= transformer.supportedForwardVersion =>
|
||||
val jsonTree = objectMapper.readTree(decompressedBytes)
|
||||
val newJsonTree = transformer.transform(fromVersion, jsonTree)
|
||||
objectMapper.treeToValue(newJsonTree, clazz)
|
||||
case None =>
|
||||
objectMapper.readValue(decompressedBytes, clazz)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,15 +9,16 @@ import com.fasterxml.jackson.databind.node.IntNode;
|
|||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
|
||||
public class JavaTestEventMigration extends JacksonMigration {
|
||||
public class JavaTestEventMigrationV2 extends JacksonMigration {
|
||||
|
||||
@Override
|
||||
public int currentVersion() {
|
||||
return 3;
|
||||
return 2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String transformClassName(int fromVersion, String className) {
|
||||
// Ignore the incoming manifest and produce the same class name always.
|
||||
return JavaTestMessages.Event2.class.getName();
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.serialization.jackson;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.IntNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
public class JavaTestEventMigrationV2WithV3 extends JacksonMigration {
|
||||
|
||||
@Override
|
||||
public int currentVersion() {
|
||||
return 2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int supportedForwardVersion() {
|
||||
return 3;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String transformClassName(int fromVersion, String className) {
|
||||
// Always produce the type of the currentVersion. When fromVersion is lower,
|
||||
// transform will lift it. When fromVersion is higher, transform will downcast it.
|
||||
return JavaTestMessages.Event2.class.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JsonNode transform(int fromVersion, JsonNode json) {
|
||||
ObjectNode root = (ObjectNode) json;
|
||||
if (fromVersion < 2) {
|
||||
root = upcastV1ToV2((ObjectNode) json);
|
||||
}
|
||||
if (fromVersion == 3) {
|
||||
root = downcastV3ToV2((ObjectNode) json);
|
||||
}
|
||||
return root;
|
||||
}
|
||||
|
||||
private ObjectNode upcastV1ToV2(ObjectNode json) {
|
||||
ObjectNode root = json;
|
||||
root.set("field1V2", root.get("field1"));
|
||||
root.remove("field1");
|
||||
root.set("field2", IntNode.valueOf(17));
|
||||
return root;
|
||||
}
|
||||
|
||||
private ObjectNode downcastV3ToV2(ObjectNode json) {
|
||||
ObjectNode root = json;
|
||||
root.set("field2", root.get("field3"));
|
||||
root.remove("field3");
|
||||
return root;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.serialization.jackson;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.IntNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
public class JavaTestEventMigrationV3 extends JacksonMigration {
|
||||
|
||||
@Override
|
||||
public int currentVersion() {
|
||||
return 3;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String transformClassName(int fromVersion, String className) {
|
||||
// Always produce the type of the currentVersion. When fromVersion is lower,
|
||||
// transform will lift it. when fromVersion is higher, transform will adapt it.
|
||||
return JavaTestMessages.Event3.class.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JsonNode transform(int fromVersion, JsonNode json) {
|
||||
ObjectNode root = (ObjectNode) json;
|
||||
if (fromVersion < 2) {
|
||||
root = upcastV1ToV2(root);
|
||||
}
|
||||
if (fromVersion < 3) {
|
||||
root = upcastV2ToV3(root);
|
||||
}
|
||||
return root;
|
||||
}
|
||||
|
||||
private ObjectNode upcastV1ToV2(ObjectNode root) {
|
||||
root.set("field1V2", root.get("field1"));
|
||||
root.remove("field1");
|
||||
root.set("field2", IntNode.valueOf(17));
|
||||
return root;
|
||||
}
|
||||
|
||||
private ObjectNode upcastV2ToV3(ObjectNode root) {
|
||||
root.set("field3", root.get("field2"));
|
||||
root.remove("field2");
|
||||
return root;
|
||||
}
|
||||
}
|
||||
|
|
@ -386,6 +386,42 @@ public interface JavaTestMessages {
|
|||
}
|
||||
}
|
||||
|
||||
public class Event3 implements TestMessage {
|
||||
private final String field1V2; // same as in Event2
|
||||
private final int field3; // renamed field (was field2)
|
||||
|
||||
public Event3(String field1V2, int field3) {
|
||||
this.field1V2 = field1V2;
|
||||
this.field3 = field3;
|
||||
}
|
||||
|
||||
public String getField1V2() {
|
||||
return field1V2;
|
||||
}
|
||||
|
||||
public int getField3() {
|
||||
return field3;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
Event3 event3 = (Event3) o;
|
||||
|
||||
if (field3 != event3.field3) return false;
|
||||
return field1V2 != null ? field1V2.equals(event3.field1V2) : event3.field1V2 == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = field1V2 != null ? field1V2.hashCode() : 0;
|
||||
result = 31 * result + field3;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
public class Zoo implements TestMessage {
|
||||
public final Animal first;
|
||||
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package jdoc.akka.serialization.jackson.v1;
|
|||
import jdoc.akka.serialization.jackson.MySerializable;
|
||||
|
||||
// #add-optional
|
||||
// #forward-one-rename
|
||||
public class ItemAdded implements MySerializable {
|
||||
public final String shoppingCartId;
|
||||
public final String productId;
|
||||
|
|
@ -18,4 +19,5 @@ public class ItemAdded implements MySerializable {
|
|||
this.quantity = quantity;
|
||||
}
|
||||
}
|
||||
// #forward-one-rename
|
||||
// #add-optional
|
||||
|
|
|
|||
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdoc.akka.serialization.jackson.v1withv2;
|
||||
|
||||
// #forward-one-rename
|
||||
|
||||
import akka.serialization.jackson.JacksonMigration;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
public class ItemAddedMigration extends JacksonMigration {
|
||||
|
||||
// Data produced in this node is still produced using the version 1 of the schema
|
||||
@Override
|
||||
public int currentVersion() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int supportedForwardVersion() {
|
||||
return 2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JsonNode transform(int fromVersion, JsonNode json) {
|
||||
ObjectNode root = (ObjectNode) json;
|
||||
if (fromVersion == 2) {
|
||||
// When receiving an event of version 2 we down-cast it to the version 1 of the schema
|
||||
root.set("productId", root.get("itemId"));
|
||||
root.remove("itemId");
|
||||
}
|
||||
return root;
|
||||
}
|
||||
}
|
||||
// #forward-one-rename
|
||||
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package akka.serialization.jackson
|
||||
|
||||
import java.lang
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
|
|
@ -13,12 +14,16 @@ import java.util.Arrays
|
|||
import java.util.Locale
|
||||
import java.util.Optional
|
||||
import java.util.UUID
|
||||
|
||||
import java.util.logging.FileHandler
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Address
|
||||
import akka.actor.BootstrapSetup
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo
|
||||
import com.fasterxml.jackson.core.JsonFactory
|
||||
|
|
@ -28,15 +33,12 @@ import com.fasterxml.jackson.core.StreamReadFeature
|
|||
import com.fasterxml.jackson.core.StreamWriteFeature
|
||||
import com.fasterxml.jackson.core.`type`.TypeReference
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import com.fasterxml.jackson.databind.MapperFeature
|
||||
import com.fasterxml.jackson.databind.Module
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.databind.SerializationFeature
|
||||
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper
|
||||
import com.fasterxml.jackson.databind.node.IntNode
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
|
||||
import com.fasterxml.jackson.module.scala.JsonScalaEnumeration
|
||||
import com.github.ghik.silencer.silent
|
||||
|
|
@ -45,11 +47,7 @@ import org.scalatest.BeforeAndAfterAll
|
|||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Address
|
||||
import akka.actor.BootstrapSetup
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.actor.Status
|
||||
import akka.actor.setup.ActorSystemSetup
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
|
|
@ -85,6 +83,7 @@ object ScalaTestMessages {
|
|||
|
||||
final case class Event1(field1: String) extends TestMessage
|
||||
final case class Event2(field1V2: String, field2: Int) extends TestMessage
|
||||
final case class Event3(field1V2: String, field3: Int) extends TestMessage
|
||||
|
||||
final case class Zoo(first: Animal) extends TestMessage
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
|
|
@ -117,21 +116,6 @@ object ScalaTestMessages {
|
|||
|
||||
}
|
||||
|
||||
class ScalaTestEventMigration extends JacksonMigration {
|
||||
override def currentVersion = 3
|
||||
|
||||
override def transformClassName(fromVersion: Int, className: String): String =
|
||||
classOf[ScalaTestMessages.Event2].getName
|
||||
|
||||
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
|
||||
val root = json.asInstanceOf[ObjectNode]
|
||||
root.set[JsonNode]("field1V2", root.get("field1"))
|
||||
root.remove("field1")
|
||||
root.set[JsonNode]("field2", IntNode.valueOf(17))
|
||||
root
|
||||
}
|
||||
}
|
||||
|
||||
class JacksonCborSerializerSpec extends JacksonSerializerSpec("jackson-cbor") {
|
||||
"have compression disabled by default" in {
|
||||
val conf = JacksonObjectMapperProvider.configForBinding("jackson-cbor", system.settings.config)
|
||||
|
|
@ -621,17 +605,8 @@ class JacksonJsonSerializerSpec extends JacksonSerializerSpec("jackson-json") {
|
|||
}
|
||||
}
|
||||
|
||||
abstract class JacksonSerializerSpec(serializerName: String)
|
||||
extends TestKit(
|
||||
ActorSystem(
|
||||
"JacksonJsonSerializerSpec",
|
||||
ConfigFactory.parseString(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event1" = "akka.serialization.jackson.JavaTestEventMigration"
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigration"
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event1" = "akka.serialization.jackson.ScalaTestEventMigration"
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigration"
|
||||
}
|
||||
object JacksonSerializerSpec {
|
||||
def baseConfig(serializerName: String): String = s"""
|
||||
akka.actor {
|
||||
serialization-bindings {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$TestMessage" = $serializerName
|
||||
|
|
@ -639,7 +614,14 @@ abstract class JacksonSerializerSpec(serializerName: String)
|
|||
}
|
||||
}
|
||||
akka.serialization.jackson.allowed-class-prefix = ["akka.serialization.jackson.ScalaTestMessages$$OldCommand"]
|
||||
""")))
|
||||
"""
|
||||
}
|
||||
|
||||
abstract class JacksonSerializerSpec(serializerName: String)
|
||||
extends TestKit(
|
||||
ActorSystem(
|
||||
"JacksonJsonSerializerSpec",
|
||||
ConfigFactory.parseString(JacksonSerializerSpec.baseConfig(serializerName))))
|
||||
with AnyWordSpecLike
|
||||
with Matchers
|
||||
with BeforeAndAfterAll {
|
||||
|
|
@ -772,22 +754,138 @@ abstract class JacksonSerializerSpec(serializerName: String)
|
|||
}
|
||||
}
|
||||
|
||||
"deserialize with migrations" in {
|
||||
// TODO: Consider moving the migrations Specs to a separate Spec
|
||||
"deserialize with migrations" in withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
## Usually the key is a FQCN but we're hacking the name to use multiple migrations for the
|
||||
## same type in a single test.
|
||||
"deserialize-Java.Event1-into-Java.Event3" = "akka.serialization.jackson.JavaTestEventMigrationV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sys =>
|
||||
val event1 = new Event1("a")
|
||||
val serializer = serializerFor(event1)
|
||||
val serializer = serializerFor(event1, sys)
|
||||
val blob = serializer.toBinary(event1)
|
||||
val event2 = serializer.fromBinary(blob, classOf[Event1].getName).asInstanceOf[Event2]
|
||||
event1.getField1 should ===(event2.getField1V2)
|
||||
event2.getField2 should ===(17)
|
||||
|
||||
// Event1 has no migration configured so it uses the default manifest name (with no version)
|
||||
serializer.manifest(event1) should ===(classOf[Event1].getName)
|
||||
|
||||
// Hack the manifest to enforce the use a particular migration when deserializing the blob of Event1
|
||||
val event3 = serializer.fromBinary(blob, "deserialize-Java.Event1-into-Java.Event3").asInstanceOf[Event3]
|
||||
event1.getField1 should ===(event3.getField1V2)
|
||||
event3.getField3 should ===(17)
|
||||
}
|
||||
|
||||
"deserialize with migrations from V2" in {
|
||||
// produce a blob/manifest from an ActorSystem without migrations
|
||||
val event1 = new Event1("a")
|
||||
val serializer = serializerFor(event1)
|
||||
val blob = serializer.toBinary(event1)
|
||||
val event2 = serializer.fromBinary(blob, classOf[Event1].getName + "#2").asInstanceOf[Event2]
|
||||
val manifest = serializer.manifest(event1)
|
||||
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event1" = "akka.serialization.jackson.JavaTestEventMigrationV2"
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
// read the blob/manifest from an ActorSystem with migrations
|
||||
val serializerV2: JacksonSerializer = serializerFor(event1, sysV2)
|
||||
val event2 = serializerV2.fromBinary(blob, manifest).asInstanceOf[Event2]
|
||||
event1.getField1 should ===(event2.getField1V2)
|
||||
event2.getField2 should ===(17)
|
||||
|
||||
// Event2 has a migration configured so it uses a manifest with a version
|
||||
val serializerFor2 = serializerFor(event2, sysV2)
|
||||
serializerFor2.manifest(event2) should ===(classOf[Event2].getName + "#2")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"use the migration's currentVersion on new serializations" in {
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
val event2 = new Event2("a", 17)
|
||||
// Event2 has a migration configured so it uses a manifest with a version
|
||||
val serializer2 = serializerFor(event2, sysV2)
|
||||
serializer2.manifest(event2) should ===(classOf[Event2].getName + "#2")
|
||||
}
|
||||
}
|
||||
|
||||
"use the migration's currentVersion on new serializations when supporting forward versions" in {
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2WithV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
val event2 = new Event2("a", 17)
|
||||
// Event2 has a migration configured so it uses a manifest with a version
|
||||
val serializer2 = serializerFor(event2, sysV2)
|
||||
serializer2.manifest(event2) should ===(classOf[Event2].getName + "#2")
|
||||
}
|
||||
}
|
||||
|
||||
"deserialize a V3 blob into a V2 class (forward-one support) and back" in {
|
||||
|
||||
val blobV3 =
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event3" = "akka.serialization.jackson.JavaTestEventMigrationV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 =>
|
||||
val event3 = new Event3("Steve", 49)
|
||||
val serializer = serializerFor(event3, sysV3)
|
||||
val blob = serializer.toBinary(event3)
|
||||
blob
|
||||
}
|
||||
|
||||
val blobV2 =
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2WithV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2WithV3 =>
|
||||
val serializerForEvent2 =
|
||||
serialization(sysV2WithV3).serializerFor(classOf[Event2]).asInstanceOf[JacksonSerializer]
|
||||
val event2 = serializerForEvent2.fromBinary(blobV3, classOf[Event2].getName + "#3").asInstanceOf[Event2]
|
||||
event2.getField1V2 should ===("Steve")
|
||||
event2.getField2 should ===(49)
|
||||
serializerForEvent2.toBinary(event2)
|
||||
}
|
||||
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event3" = "akka.serialization.jackson.JavaTestEventMigrationV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 =>
|
||||
val serializerForEvent3 = serialization(sysV3).serializerFor(classOf[Event3]).asInstanceOf[JacksonSerializer]
|
||||
val event3 = serializerForEvent3.fromBinary(blobV2, classOf[Event3].getName + "#2").asInstanceOf[Event3]
|
||||
event3.getField1V2 should ===("Steve")
|
||||
event3.getField3 should ===(49)
|
||||
}
|
||||
}
|
||||
|
||||
"deserialize unsupported versions throws an exception" in {
|
||||
intercept[lang.IllegalStateException] {
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event1" = "akka.serialization.jackson.JavaTestEventMigrationV2"
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
// produce a blob/manifest from an ActorSystem without migrations
|
||||
val event1 = new Event1("a")
|
||||
val serializer = serializerFor(event1)
|
||||
val blob = serializer.toBinary(event1)
|
||||
val manifest = serializer.manifest(event1)
|
||||
// Event1 has no migration configured so it uses the default manifest name (with no version)
|
||||
val serializerV2: JacksonSerializer = serializerFor(event1, sysV2)
|
||||
serializerV2.fromBinary(blob, manifest + "#9").asInstanceOf[Event2]
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -882,22 +980,125 @@ abstract class JacksonSerializerSpec(serializerName: String)
|
|||
}
|
||||
}
|
||||
|
||||
"deserialize with migrations" in {
|
||||
// TODO: Consider moving the migrations Specs to a separate Spec
|
||||
"deserialize with migrations" in withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
## Usually the key is a FQCN but we're hacking the name to use multiple migrations for the
|
||||
## same type in a single test.
|
||||
"deserialize-Event1-into-Event3" = "akka.serialization.jackson.ScalaTestEventMigrationV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sys =>
|
||||
val event1 = Event1("a")
|
||||
val serializer = serializerFor(event1)
|
||||
val serializer = serializerFor(event1, sys)
|
||||
val blob = serializer.toBinary(event1)
|
||||
val event2 = serializer.fromBinary(blob, classOf[Event1].getName).asInstanceOf[Event2]
|
||||
event1.field1 should ===(event2.field1V2)
|
||||
event2.field2 should ===(17)
|
||||
|
||||
// Event1 has no migration configured so it uses the default manifest name (with no version)
|
||||
serializer.manifest(event1) should ===(classOf[Event1].getName)
|
||||
|
||||
// Hack the manifest to enforce the use a particular migration when deserializing the blob of Event1
|
||||
val event3 = serializer.fromBinary(blob, "deserialize-Event1-into-Event3").asInstanceOf[Event3]
|
||||
event1.field1 should ===(event3.field1V2)
|
||||
event3.field3 should ===(17)
|
||||
}
|
||||
|
||||
"deserialize with migrations from V2" in {
|
||||
// produce a blob/manifest from an ActorSystem without migrations
|
||||
val event1 = Event1("a")
|
||||
val serializer = serializerFor(event1)
|
||||
val blob = serializer.toBinary(event1)
|
||||
val event2 = serializer.fromBinary(blob, classOf[Event1].getName + "#2").asInstanceOf[Event2]
|
||||
val manifest = serializer.manifest(event1)
|
||||
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event1" = "akka.serialization.jackson.ScalaTestEventMigrationV2"
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
// read the blob/manifest from an ActorSystem with migrations
|
||||
val serializerV2: JacksonSerializer = serializerFor(event1, sysV2)
|
||||
val event2 = serializerV2.fromBinary(blob, manifest).asInstanceOf[Event2]
|
||||
event1.field1 should ===(event2.field1V2)
|
||||
event2.field2 should ===(17)
|
||||
|
||||
// Event2 has a migration configured so it uses a manifest with a version
|
||||
val serializerFor2 = serializerFor(event2, sysV2)
|
||||
serializerFor2.manifest(event2) should ===(classOf[Event2].getName + "#2")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"use the migration's currentVersion on new serializations" in {
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
val event2 = new Event2("a", 17)
|
||||
// Event2 has a migration configured so it uses a manifest with a version
|
||||
val serializer2 = serializerFor(event2, sysV2)
|
||||
serializer2.manifest(event2) should ===(classOf[Event2].getName + "#2")
|
||||
}
|
||||
}
|
||||
|
||||
"deserialize a V3 blob into a V2 class (forward-one support) and back" in {
|
||||
|
||||
val blobV3 =
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event3" = "akka.serialization.jackson.ScalaTestEventMigrationV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 =>
|
||||
val event3 = new Event3("Steve", 49)
|
||||
val serializer = serializerFor(event3, sysV3)
|
||||
val blob = serializer.toBinary(event3)
|
||||
blob
|
||||
}
|
||||
|
||||
val blobV2 =
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2WithV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2WithV3 =>
|
||||
val serializerForEvent2 =
|
||||
serialization(sysV2WithV3).serializerFor(classOf[Event2]).asInstanceOf[JacksonSerializer]
|
||||
val event2 = serializerForEvent2.fromBinary(blobV3, classOf[Event2].getName + "#3").asInstanceOf[Event2]
|
||||
event2.field1V2 should ===("Steve")
|
||||
event2.field2 should ===(49)
|
||||
serializerForEvent2.toBinary(event2)
|
||||
}
|
||||
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event3" = "akka.serialization.jackson.ScalaTestEventMigrationV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 =>
|
||||
val serializerForEvent3 = serialization(sysV3).serializerFor(classOf[Event3]).asInstanceOf[JacksonSerializer]
|
||||
val event3 = serializerForEvent3.fromBinary(blobV2, classOf[Event3].getName + "#2").asInstanceOf[Event3]
|
||||
event3.field1V2 should ===("Steve")
|
||||
event3.field3 should ===(49)
|
||||
}
|
||||
}
|
||||
|
||||
"deserialize unsupported versions throws an exception" in {
|
||||
intercept[lang.IllegalStateException] {
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event1" = "akka.serialization.jackson.ScalaTestEventMigrationV2"
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
// produce a blob/manifest from an ActorSystem without migrations
|
||||
val event1 = new Event1("a")
|
||||
val serializer = serializerFor(event1)
|
||||
val blob = serializer.toBinary(event1)
|
||||
val manifest = serializer.manifest(event1)
|
||||
// Event1 has no migration configured so it uses the default manifest name (with no version)
|
||||
val serializerV2: JacksonSerializer = serializerFor(event1, sysV2)
|
||||
serializerV2.fromBinary(blob, manifest + "#9").asInstanceOf[Event2]
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
"not allow serialization of deny listed class" in {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.serialization.jackson
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import com.fasterxml.jackson.databind.node.IntNode
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode
|
||||
|
||||
object ScalaTestEventMigration {
|
||||
def upcastV1ToV2(root: ObjectNode): ObjectNode = {
|
||||
root.set[JsonNode]("field1V2", root.get("field1"))
|
||||
root.remove("field1")
|
||||
root.set[JsonNode]("field2", IntNode.valueOf(17))
|
||||
root
|
||||
}
|
||||
|
||||
def upcastV2ToV3(root: ObjectNode): ObjectNode = {
|
||||
root.set("field3", root.get("field2"))
|
||||
root.remove("field2")
|
||||
root
|
||||
}
|
||||
|
||||
def downcastV3ToV2(root: ObjectNode) = {
|
||||
// downcast the V3 representation to the V2 representation. A field
|
||||
// is renamed.
|
||||
root.set("field2", root.get("field3"))
|
||||
root.remove("field3")
|
||||
root
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ScalaTestEventMigrationV2 extends JacksonMigration {
|
||||
import ScalaTestEventMigration._
|
||||
|
||||
override def currentVersion = 2
|
||||
|
||||
override def transformClassName(fromVersion: Int, className: String): String =
|
||||
classOf[ScalaTestMessages.Event2].getName
|
||||
|
||||
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
|
||||
val root = json.asInstanceOf[ObjectNode]
|
||||
upcastV1ToV2(root)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ScalaTestEventMigrationV2WithV3 extends JacksonMigration {
|
||||
import ScalaTestEventMigration._
|
||||
|
||||
override def currentVersion = 2
|
||||
|
||||
override def supportedForwardVersion: Int = 3
|
||||
|
||||
// Always produce the type of the currentVersion. When fromVersion is lower,
|
||||
// transform will lift it. When fromVersion is higher, transform will downcast it.
|
||||
override def transformClassName(fromVersion: Int, className: String): String =
|
||||
classOf[ScalaTestMessages.Event2].getName
|
||||
|
||||
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
|
||||
var root = json.asInstanceOf[ObjectNode]
|
||||
if (fromVersion < 2) {
|
||||
root = upcastV1ToV2(root)
|
||||
}
|
||||
if (fromVersion == 3) {
|
||||
root = downcastV3ToV2(root)
|
||||
}
|
||||
root
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ScalaTestEventMigrationV3 extends JacksonMigration {
|
||||
import ScalaTestEventMigration._
|
||||
|
||||
override def currentVersion = 3
|
||||
|
||||
override def transformClassName(fromVersion: Int, className: String): String =
|
||||
classOf[ScalaTestMessages.Event3].getName
|
||||
|
||||
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
|
||||
var root = json.asInstanceOf[ObjectNode]
|
||||
if (fromVersion < 2) {
|
||||
root = upcastV1ToV2(root)
|
||||
}
|
||||
if (fromVersion < 3) {
|
||||
root = upcastV2ToV3(root)
|
||||
}
|
||||
root
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -7,5 +7,7 @@ package doc.akka.serialization.jackson.v1
|
|||
import doc.akka.serialization.jackson.MySerializable
|
||||
|
||||
// #add-optional
|
||||
// #forward-one-rename
|
||||
case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int) extends MySerializable
|
||||
// #forward-one-rename
|
||||
// #add-optional
|
||||
|
|
|
|||
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package doc.akka.serialization.jackson.v1withv2
|
||||
|
||||
// #forward-one-rename
|
||||
import akka.serialization.jackson.JacksonMigration
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode
|
||||
|
||||
class ItemAddedMigration extends JacksonMigration {
|
||||
|
||||
// Data produced in this node is still produced using the version 1 of the schema
|
||||
override def currentVersion: Int = 1
|
||||
|
||||
override def supportedForwardVersion: Int = 2
|
||||
|
||||
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
|
||||
val root = json.asInstanceOf[ObjectNode]
|
||||
if (fromVersion == 2) {
|
||||
// When receiving an event of version 2 we down-cast it to the version 1 of the schema
|
||||
root.set[JsonNode]("productId", root.get("itemId"))
|
||||
root.remove("itemId")
|
||||
}
|
||||
root
|
||||
}
|
||||
}
|
||||
// #forward-one-rename
|
||||
Loading…
Add table
Add a link
Reference in a new issue