=per #16009 Serialize SnapshotHeader without Java serialization
* it is backwards compatible, i.e. it can read old snapshots (cherry picked from commit 690905eac9619da91d7b1fcbc633d1fa2e411ee7) Conflicts: akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala
This commit is contained in:
parent
dd76120376
commit
c3f16ad740
2 changed files with 50 additions and 7 deletions
|
|
@ -9,6 +9,8 @@ import java.io._
|
|||
import akka.actor._
|
||||
import akka.serialization.{ Serializer, SerializationExtension }
|
||||
import akka.serialization.Serialization
|
||||
import scala.util.Success
|
||||
import scala.util.Failure
|
||||
|
||||
/**
|
||||
* Wrapper for snapshot `data`. Snapshot `data` are the actual snapshot objects captured by
|
||||
|
|
@ -59,11 +61,12 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer {
|
|||
val extension = SerializationExtension(system)
|
||||
|
||||
val snapshotSerializer = extension.findSerializerFor(snapshot)
|
||||
val snapshotManifest = if (snapshotSerializer.includeManifest) Some(snapshot.getClass.getName) else None
|
||||
|
||||
val header = SnapshotHeader(snapshotSerializer.identifier, snapshotManifest)
|
||||
val headerSerializer = extension.findSerializerFor(header)
|
||||
val headerBytes = headerSerializer.toBinary(header)
|
||||
val headerOut = new ByteArrayOutputStream
|
||||
writeInt(headerOut, snapshotSerializer.identifier)
|
||||
if (snapshotSerializer.includeManifest)
|
||||
headerOut.write(snapshot.getClass.getName.getBytes("utf-8"))
|
||||
val headerBytes = headerOut.toByteArray
|
||||
|
||||
val out = new ByteArrayOutputStream
|
||||
|
||||
|
|
@ -88,7 +91,25 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer {
|
|||
val headerBytes = bytes.slice(4, headerLength + 4)
|
||||
val snapshotBytes = bytes.drop(headerLength + 4)
|
||||
|
||||
val header = extension.deserialize(headerBytes, classOf[SnapshotHeader]).get
|
||||
// If we are allowed to break serialization compatibility of stored snapshots in 2.4
|
||||
// we can remove this attempt of deserialize SnapshotHeader with JavaSerializer.
|
||||
// Then the class SnapshotHeader can be removed. See isseue #16009
|
||||
val header = extension.deserialize(headerBytes, classOf[SnapshotHeader]) match {
|
||||
case Success(header) ⇒
|
||||
header
|
||||
case Failure(_) ⇒
|
||||
val headerIn = new ByteArrayInputStream(headerBytes)
|
||||
val serializerId = readInt(headerIn)
|
||||
val remaining = headerIn.available
|
||||
val manifest =
|
||||
if (remaining == 0) None
|
||||
else {
|
||||
val manifestBytes = Array.ofDim[Byte](remaining)
|
||||
headerIn.read(manifestBytes)
|
||||
Some(new String(manifestBytes, "utf-8"))
|
||||
}
|
||||
SnapshotHeader(serializerId, manifest)
|
||||
}
|
||||
val manifest = header.manifest.map(system.dynamicAccess.getClassFor[AnyRef](_).get)
|
||||
|
||||
extension.deserialize[AnyRef](snapshotBytes, header.serializerId, manifest).get
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
|
|||
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration.Duration
|
||||
import org.apache.commons.codec.binary.Hex.decodeHex
|
||||
|
||||
object SerializerSpecConfigs {
|
||||
val customSerializers = ConfigFactory.parseString(
|
||||
|
|
@ -69,6 +70,27 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
|
|||
|
||||
deserialized should be(Snapshot(MySnapshot(".a.")))
|
||||
}
|
||||
|
||||
"be able to read snapshot created with akka 2.3.6" in {
|
||||
val dataStr = "abc"
|
||||
val snapshot = Snapshot(dataStr.getBytes("utf-8"))
|
||||
val serializer = serialization.findSerializerFor(snapshot)
|
||||
|
||||
// the oldSnapshot was created with Akka 2.3.6 and it is using JavaSerialization
|
||||
// for the SnapshotHeader. See issue #16009.
|
||||
// It was created with:
|
||||
// println(s"encoded snapshot: " + String.valueOf(encodeHex(serializer.toBinary(snapshot))))
|
||||
val oldSnapshot = "a8000000aced00057372002d616b6b612e70657273697374656e63652e73657269616c697a6174696f6e" +
|
||||
"2e536e617073686f74486561646572000000000000000102000249000c73657269616c697a657249644c00086d616e696665" +
|
||||
"737474000e4c7363616c612f4f7074696f6e3b7870000000047372000b7363616c612e4e6f6e6524465024f653ca94ac0200" +
|
||||
"007872000c7363616c612e4f7074696f6ee36024a8328a45e90200007870616263"
|
||||
|
||||
val bytes = decodeHex(oldSnapshot.toCharArray)
|
||||
val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[Snapshot]
|
||||
|
||||
val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], "utf-8")
|
||||
dataStr should be(deserializedDataStr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue