Merge pull request #16660 from akka/wip-16659-SnapshotHeader2.10-∂π
=per #16659 fix deserialization of SnapshotHeader AND SWITCH TO Scala 2.11.4
This commit is contained in:
commit
cedfaa35cc
3 changed files with 139 additions and 26 deletions
|
|
@ -87,17 +87,46 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer {
|
|||
private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = {
|
||||
val extension = SerializationExtension(system)
|
||||
|
||||
val headerLength = readInt(new ByteArrayInputStream(bytes))
|
||||
val in = new ByteArrayInputStream(bytes)
|
||||
val headerLength = readInt(in)
|
||||
val headerBytes = bytes.slice(4, headerLength + 4)
|
||||
val snapshotBytes = bytes.drop(headerLength + 4)
|
||||
|
||||
/*
|
||||
* Attempt to find the `key` in the supplied array and if successful
|
||||
* construct a new array, replacing the contained UID with the `replacement`.
|
||||
*/
|
||||
def patch(b: Array[Byte]): Array[Byte] = {
|
||||
import SnapshotSerializer._
|
||||
def find(pos: Int, offset: Int): Int = {
|
||||
if (pos == b.length) -1
|
||||
else if (offset == key.length) pos
|
||||
else if (b(pos + offset) == key(offset)) find(pos, offset + 1)
|
||||
else find(pos + 1, 0)
|
||||
}
|
||||
val found = find(0, 0)
|
||||
if (found == -1) b
|
||||
else {
|
||||
val n = new Array[Byte](b.length)
|
||||
val start = found + offset
|
||||
val end = start + replacement.length
|
||||
System.arraycopy(b, 0, n, 0, start)
|
||||
System.arraycopy(replacement, 0, n, start, replacement.length)
|
||||
System.arraycopy(b, end, n, end, b.length - end)
|
||||
n
|
||||
}
|
||||
}
|
||||
|
||||
// If we are allowed to break serialization compatibility of stored snapshots in 2.4
|
||||
// we can remove this attempt of deserialize SnapshotHeader with JavaSerializer.
|
||||
// Then the class SnapshotHeader can be removed. See isseue #16009
|
||||
val header = extension.deserialize(headerBytes, classOf[SnapshotHeader]) match {
|
||||
case Success(header) ⇒
|
||||
header
|
||||
case Failure(_) ⇒
|
||||
// we can remove this attempt to deserialize SnapshotHeader with JavaSerializer.
|
||||
// Then the class SnapshotHeader can be removed. See issue #16009
|
||||
val oldHeader =
|
||||
if (readShort(in) == 0xedac) { // Java Serialization magic value with swapped bytes
|
||||
val b = if (SnapshotSerializer.doPatch) patch(headerBytes) else headerBytes
|
||||
extension.deserialize(b, classOf[SnapshotHeader]).toOption
|
||||
} else None
|
||||
|
||||
val header = oldHeader.getOrElse {
|
||||
val headerIn = new ByteArrayInputStream(headerBytes)
|
||||
val serializerId = readInt(headerIn)
|
||||
val remaining = headerIn.available
|
||||
|
|
@ -118,6 +147,63 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer {
|
|||
private def writeInt(outputStream: OutputStream, i: Int) =
|
||||
0 to 24 by 8 foreach { shift ⇒ outputStream.write(i >> shift) }
|
||||
|
||||
private def readInt(inputStream: InputStream) =
|
||||
(0 to 24 by 8).foldLeft(0) { (id, shift) ⇒ (id | (inputStream.read() << shift)) }
|
||||
private def readShort(inputStream: InputStream) = {
|
||||
val ch1 = inputStream.read()
|
||||
val ch2 = inputStream.read()
|
||||
(ch2 << 8) | ch1
|
||||
}
|
||||
|
||||
private def readInt(inputStream: InputStream) = {
|
||||
val sh1 = readShort(inputStream)
|
||||
val sh2 = readShort(inputStream)
|
||||
(sh2 << 16) | sh1
|
||||
}
|
||||
}
|
||||
|
||||
object SnapshotSerializer {
|
||||
/*
|
||||
* This is the serialized form of Class[Option[_]] (as a superclass, hence
|
||||
* the leading 0x78) with Scala 2.10.
|
||||
*/
|
||||
val key: Array[Byte] = Array(
|
||||
0x78, 0x72, 0x00, 0x0c, 0x73, 0x63, 0x61, 0x6c,
|
||||
0x61, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e,
|
||||
0xe3, 0x60, 0x24, 0xa8, 0x32, 0x8a, 0x45, 0xe9, // here is the UID
|
||||
0x02, 0x00, 0x00, 0x78, 0x70).map(_.toByte)
|
||||
// The offset of the serialVersionUID in the above.
|
||||
val offset = 16
|
||||
// This is the new serialVersionUID from Scala 2.11 onwards.
|
||||
val replacement: Array[Byte] = Array(
|
||||
0xfe, 0x69, 0x37, 0xfd, 0xdb, 0x0e, 0x66, 0x74).map(_.toByte)
|
||||
val doPatch: Boolean = {
|
||||
/*
|
||||
* The only way to obtain the serialVersionUID for the Option[_] class is
|
||||
* to serialize one and look at the result, since prior to 2.11.5 it does
|
||||
* not declare such a field and relies on automatic UID generation.
|
||||
*/
|
||||
val baus = new ByteArrayOutputStream()
|
||||
val out = new ObjectOutputStream(baus)
|
||||
out.writeObject(None)
|
||||
// the first 30 bytes serialize the None object, then comes its superclass Option[_]
|
||||
val superOffset = 30
|
||||
val uidOffset = superOffset + offset
|
||||
val clazz: Seq[Byte] = baus.toByteArray.slice(superOffset, uidOffset)
|
||||
val knownClazz: Seq[Byte] = key.take(offset)
|
||||
val uid: Seq[Byte] = baus.toByteArray.slice(uidOffset, uidOffset + replacement.length)
|
||||
// only attempt to patch if we know the target class
|
||||
if (clazz == knownClazz) {
|
||||
if (uid == replacement.toSeq) {
|
||||
// running on 2.11
|
||||
true
|
||||
} else if (uid == (key.slice(offset, offset + replacement.length): Seq[Byte])) {
|
||||
// running on 2.10, need to switch out UID between key and replacement
|
||||
val len = replacement.length
|
||||
val tmp = new Array[Byte](len)
|
||||
System.arraycopy(replacement, 0, tmp, 0, len)
|
||||
System.arraycopy(key, offset, replacement, 0, len)
|
||||
System.arraycopy(tmp, 0, key, offset, len)
|
||||
true
|
||||
} else false
|
||||
} else false
|
||||
}
|
||||
}
|
||||
|
|
@ -71,7 +71,7 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
|
|||
deserialized should be(Snapshot(MySnapshot(".a.")))
|
||||
}
|
||||
|
||||
"be able to read snapshot created with akka 2.3.6" in {
|
||||
"be able to read snapshot created with akka 2.3.6 and Scala 2.10" in {
|
||||
val dataStr = "abc"
|
||||
val snapshot = Snapshot(dataStr.getBytes("utf-8"))
|
||||
val serializer = serialization.findSerializerFor(snapshot)
|
||||
|
|
@ -80,10 +80,37 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
|
|||
// for the SnapshotHeader. See issue #16009.
|
||||
// It was created with:
|
||||
// println(s"encoded snapshot: " + String.valueOf(encodeHex(serializer.toBinary(snapshot))))
|
||||
val oldSnapshot = "a8000000aced00057372002d616b6b612e70657273697374656e63652e73657269616c697a6174696f6e" +
|
||||
"2e536e617073686f74486561646572000000000000000102000249000c73657269616c697a657249644c00086d616e696665" +
|
||||
"737474000e4c7363616c612f4f7074696f6e3b7870000000047372000b7363616c612e4e6f6e6524465024f653ca94ac0200" +
|
||||
"007872000c7363616c612e4f7074696f6ee36024a8328a45e90200007870616263"
|
||||
val oldSnapshot = // 32 bytes per line
|
||||
"a8000000aced00057372002d616b6b612e70657273697374656e63652e736572" +
|
||||
"69616c697a6174696f6e2e536e617073686f7448656164657200000000000000" +
|
||||
"0102000249000c73657269616c697a657249644c00086d616e69666573747400" +
|
||||
"0e4c7363616c612f4f7074696f6e3b7870000000047372000b7363616c612e4e" +
|
||||
"6f6e6524465024f653ca94ac0200007872000c7363616c612e4f7074696f6ee3" +
|
||||
"6024a8328a45e90200007870616263"
|
||||
|
||||
val bytes = decodeHex(oldSnapshot.toCharArray)
|
||||
val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[Snapshot]
|
||||
|
||||
val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], "utf-8")
|
||||
dataStr should be(deserializedDataStr)
|
||||
}
|
||||
|
||||
"be able to read snapshot created with akka 2.3.6 and Scala 2.11" in {
|
||||
val dataStr = "abc"
|
||||
val snapshot = Snapshot(dataStr.getBytes("utf-8"))
|
||||
val serializer = serialization.findSerializerFor(snapshot)
|
||||
|
||||
// the oldSnapshot was created with Akka 2.3.6 and it is using JavaSerialization
|
||||
// for the SnapshotHeader. See issue #16009.
|
||||
// It was created with:
|
||||
// println(s"encoded snapshot: " + String.valueOf(encodeHex(serializer.toBinary(snapshot))))
|
||||
val oldSnapshot = // 32 bytes per line
|
||||
"a8000000aced00057372002d616b6b612e70657273697374656e63652e736572" +
|
||||
"69616c697a6174696f6e2e536e617073686f7448656164657200000000000000" +
|
||||
"0102000249000c73657269616c697a657249644c00086d616e69666573747400" +
|
||||
"0e4c7363616c612f4f7074696f6e3b7870000000047372000b7363616c612e4e" +
|
||||
"6f6e6524465024f653ca94ac0200007872000c7363616c612e4f7074696f6efe" +
|
||||
"6937fddb0e66740200007870616263"
|
||||
|
||||
val bytes = decodeHex(oldSnapshot.toCharArray)
|
||||
val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[Snapshot]
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ object Dependencies {
|
|||
import DependencyHelpers.ScalaVersionDependentModuleID._
|
||||
|
||||
object Versions {
|
||||
val crossScala = Seq("2.10.4", "2.11.4")
|
||||
val crossScala = Seq("2.11.5", "2.10.4")
|
||||
val scalaVersion = crossScala.head
|
||||
val scalaStmVersion = sys.props.get("akka.build.scalaStmVersion").getOrElse("0.7")
|
||||
val scalaTestVersion = sys.props.get("akka.build.scalaTestVersion").getOrElse("2.1.3")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue