Add new migration strategy for Pekko Persistence snapshots (#1423)

* Add new migration strategy for Pekko Persistence snapshots

Update reference.conf

Update SnapshotSerializerSpec.scala

new impl

* add more tests

* add akka mode test

* rename file

* Update SnapshotSerializerMigrationAkkaSpec.scala

* extend akka test

* Update reference.conf

* Update SnapshotSerializer.scala

* rename config

* fix name of tests
This commit is contained in:
PJ Fanning 2024-08-21 11:58:16 +01:00 committed by GitHub
parent 9b8ddc808a
commit 0fa708382c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 222 additions and 18 deletions

View file

@ -2,7 +2,7 @@
#####################################
# Pekko Actor Reference Config File #
########################3############
#####################################
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.

View file

@ -1,8 +1,8 @@
# SPDX-License-Identifier: Apache-2.0
###########################################################
############################################################
# Pekko Persistence Extension Reference Configuration File #
###########################################################
############################################################
# This is the reference config file that contains all the default settings.
# Make your edits in your application.conf in order to override these settings.
@ -42,6 +42,11 @@ pekko.persistence {
plugin = ""
# List of snapshot stores to start automatically. Use "" for the default snapshot store.
auto-start-snapshot-stores = []
# When migrating from using Akka Persistence to using Pekko Persistence,
# you may need to have the serializer handle Akka or Pekko created snapshots.
# Supported values are "pekko", "akka" and "no-migration".
# See https://cwiki.apache.org/confluence/display/PEKKO/Pekko+Akka+Compatibility
auto-migrate-manifest = "pekko"
}
# used as default-snapshot store if no plugin configured
# (see `pekko.persistence.snapshot-store`)

View file

@ -29,15 +29,39 @@ import pekko.util.ByteString.UTF_8
@SerialVersionUID(1L)
final case class Snapshot(data: Any)
private[serialization] sealed trait SnapshotAutoMigration
private[serialization] object SnapshotAutoMigration {
val ConfigName = "pekko.persistence.snapshot-store.auto-migrate-manifest"
// Ignore the snapshot migration strategy - means that Pekko will not be able to work with snapshots saved by Akka
object NoMigration extends SnapshotAutoMigration
// When saving snapshots, migrate any manifests with `akka` to `org.apache.pekko`
object Pekko extends SnapshotAutoMigration
// When saving snapshots, migrate any manifests with `org.apache.pekko` to `akka`
object Akka extends SnapshotAutoMigration
def fromString(s: String): SnapshotAutoMigration = s match {
case "no-migration" => NoMigration
case "pekko" => Pekko
case "akka" => Akka
case _ => throw new IllegalArgumentException(s"Unknown snapshot migration strategy: $s")
}
}
/**
* [[Snapshot]] serializer.
*/
class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
import SnapshotAutoMigration._
override val includeManifest: Boolean = false
private lazy val serialization = SerializationExtension(system)
private lazy val migrationStrategy = SnapshotAutoMigration.fromString(
system.settings.config.getString(ConfigName))
/**
* Serializes a [[Snapshot]]. Delegates serialization of snapshot `data` to a matching
* `org.apache.pekko.serialization.Serializer`.
@ -58,7 +82,7 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val out = new ByteArrayOutputStream
writeInt(out, snapshotSerializer.identifier)
val ms = Serializers.manifestFor(snapshotSerializer, snapshot)
val ms = migrateManifestIfNecessary(Serializers.manifestFor(snapshotSerializer, snapshot))
if (ms.nonEmpty) out.write(ms.getBytes(UTF_8))
out.toByteArray
@ -77,11 +101,44 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
else {
val manifestBytes = new Array[Byte](remaining)
in.read(manifestBytes)
new String(manifestBytes, UTF_8)
migrateManifestToPekkoIfNecessary(new String(manifestBytes, UTF_8))
}
(serializerId, manifest)
}
// when writing the data, we want to allow the serialized data to
// support Akka and Pekko serializers as required by configuration
private def migrateManifestIfNecessary(manifest: String): String = {
migrationStrategy match {
case NoMigration => manifest
case Pekko =>
if (manifest.startsWith("akka")) {
manifest.replaceFirst("akka", "org.apache.pekko")
} else {
manifest
}
case Akka =>
if (manifest.startsWith("org.apache.pekko")) {
manifest.replaceFirst("org.apache.pekko", "akka")
} else {
manifest
}
}
}
// when reading the data, we want to force use of the Pekko serializer
private def migrateManifestToPekkoIfNecessary(manifest: String): String = {
migrationStrategy match {
case NoMigration => manifest
case _ =>
if (manifest.startsWith("akka")) {
manifest.replaceFirst("akka", "org.apache.pekko")
} else {
manifest
}
}
}
private def snapshotToBinary(snapshot: AnyRef): Array[Byte] = {
def serialize() = {
val snapshotSerializer = serialization.findSerializerFor(snapshot)
@ -112,14 +169,8 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val (serializerId, manifest) = headerFromBinary(headerBytes)
// suggested in https://github.com/scullxbones/pekko-persistence-mongo/pull/14#issuecomment-1847223850
serialization
.deserialize(snapshotBytes, serializerId, manifest)
.recoverWith {
case _: NotSerializableException if manifest.startsWith("akka") =>
serialization
.deserialize(snapshotBytes, serializerId, manifest.replaceFirst("akka", "org.apache.pekko"))
}
.get
}

View file

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pekko.persistence.serialization
import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.persistence.fsm.PersistentFSM.PersistentFSMSnapshot
import pekko.serialization.SerializationExtension
import pekko.testkit.PekkoSpec
import java.io.NotSerializableException
import java.util.Base64
class SnapshotSerializerMigrationAkkaSpec extends PekkoSpec(
s"${SnapshotAutoMigration.ConfigName}=akka"
) {
import SnapshotSerializerTestData._
"Snapshot serializer with migration to Akka" should {
"deserialize akka snapshots" in {
val serialization = SerializationExtension(system)
val bytes = Base64.getDecoder.decode(akkaSnapshotData)
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"deserialize pekko snapshots" in {
val serialization = SerializationExtension(system)
val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"serialize snapshot with Akka class name" in {
val serialization = SerializationExtension(system)
val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get
val cfg = ConfigFactory.parseString(s"${SnapshotAutoMigration.ConfigName}=no-migration")
.withFallback(system.settings.config)
val pekkoOnlySystem = ActorSystem("pekko-only-serialization", cfg)
try {
val pekkoOnlySerialization = SerializationExtension(pekkoOnlySystem)
intercept[NotSerializableException] {
pekkoOnlySerialization.deserialize(bytes, classOf[Snapshot]).get
}
} finally {
pekkoOnlySystem.terminate()
}
}
}
}

View file

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pekko.persistence.serialization
import org.apache.pekko
import pekko.persistence.fsm.PersistentFSM.PersistentFSMSnapshot
import pekko.serialization.SerializationExtension
import pekko.testkit.PekkoSpec
import java.io.NotSerializableException
import java.util.Base64
class SnapshotSerializerNoMigrationSpec extends PekkoSpec(
s"${SnapshotAutoMigration.ConfigName}=no-migration"
) {
import SnapshotSerializerTestData._
"Snapshot serializer with no migration" should {
"deserialize pekko snapshots" in {
val serialization = SerializationExtension(system)
val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"fail to deserialize akka snapshots" in {
val serialization = SerializationExtension(system)
val bytes = Base64.getDecoder.decode(akkaSnapshotData)
intercept[NotSerializableException] {
serialization.deserialize(bytes, classOf[Snapshot]).get
}
}
}
}

View file

@ -18,28 +18,53 @@
package org.apache.pekko.persistence.serialization
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.persistence.fsm.PersistentFSM.PersistentFSMSnapshot
import pekko.serialization.SerializationExtension
import pekko.testkit.PekkoSpec
import java.util.Base64
private[serialization] object SnapshotSerializerTestData {
val fsmSnapshot = PersistentFSMSnapshot[String]("test-identifier", "test-data", None)
// https://github.com/apache/pekko/pull/837#issuecomment-1847320309
val akkaSnapshotData =
"PAAAAAcAAABha2thLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh"
}
class SnapshotSerializerSpec extends PekkoSpec {
import SnapshotSerializerTestData._
"Snapshot serializer" should {
"deserialize akka snapshots" in {
val system = ActorSystem()
val serialization = SerializationExtension(system)
// https://github.com/apache/pekko/pull/837#issuecomment-1847320309
val data =
"PAAAAAcAAABha2thLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh"
val bytes = Base64.getDecoder.decode(data)
val bytes = Base64.getDecoder.decode(akkaSnapshotData)
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual PersistentFSMSnapshot[String]("test-identifier", "test-data", None)
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"deserialize pekko snapshots" in {
val serialization = SerializationExtension(system)
val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"deserialize pre-saved pekko snapshots" in {
val serialization = SerializationExtension(system)
// this is Pekko encoded snapshot based on https://github.com/apache/pekko/pull/837#issuecomment-1847320309
val pekkoSnapshotData =
"SAAAAAcAAABvcmcuYXBhY2hlLnBla2tvLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh"
val bytes = Base64.getDecoder.decode(pekkoSnapshotData)
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
}
}