diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala index 6d34ba5ab9..cea4366907 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala @@ -87,28 +87,57 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = { val extension = SerializationExtension(system) - val headerLength = readInt(new ByteArrayInputStream(bytes)) + val in = new ByteArrayInputStream(bytes) + val headerLength = readInt(in) val headerBytes = bytes.slice(4, headerLength + 4) val snapshotBytes = bytes.drop(headerLength + 4) + /* + * Attempt to find the `key` in the supplied array and if successful + * construct a new array, replacing the contained UID with the `replacement`. + */ + def patch(b: Array[Byte]): Array[Byte] = { + import SnapshotSerializer._ + def find(pos: Int, offset: Int): Int = { + if (pos == b.length) -1 + else if (offset == key.length) pos + else if (b(pos + offset) == key(offset)) find(pos, offset + 1) + else find(pos + 1, 0) + } + val found = find(0, 0) + if (found == -1) b + else { + val n = new Array[Byte](b.length) + val start = found + offset + val end = start + replacement.length + System.arraycopy(b, 0, n, 0, start) + System.arraycopy(replacement, 0, n, start, replacement.length) + System.arraycopy(b, end, n, end, b.length - end) + n + } + } + // If we are allowed to break serialization compatibility of stored snapshots in 2.4 - // we can remove this attempt of deserialize SnapshotHeader with JavaSerializer. - // Then the class SnapshotHeader can be removed. See isseue #16009 - val header = extension.deserialize(headerBytes, classOf[SnapshotHeader]) match { - case Success(header) ⇒ - header - case Failure(_) ⇒ - val headerIn = new ByteArrayInputStream(headerBytes) - val serializerId = readInt(headerIn) - val remaining = headerIn.available - val manifest = - if (remaining == 0) None - else { - val manifestBytes = Array.ofDim[Byte](remaining) - headerIn.read(manifestBytes) - Some(new String(manifestBytes, "utf-8")) - } - SnapshotHeader(serializerId, manifest) + // we can remove this attempt to deserialize SnapshotHeader with JavaSerializer. + // Then the class SnapshotHeader can be removed. See issue #16009 + val oldHeader = + if (readShort(in) == 0xedac) { // Java Serialization magic value with swapped bytes + val b = if (SnapshotSerializer.doPatch) patch(headerBytes) else headerBytes + extension.deserialize(b, classOf[SnapshotHeader]).toOption + } else None + + val header = oldHeader.getOrElse { + val headerIn = new ByteArrayInputStream(headerBytes) + val serializerId = readInt(headerIn) + val remaining = headerIn.available + val manifest = + if (remaining == 0) None + else { + val manifestBytes = Array.ofDim[Byte](remaining) + headerIn.read(manifestBytes) + Some(new String(manifestBytes, "utf-8")) + } + SnapshotHeader(serializerId, manifest) } val manifest = header.manifest.map(system.dynamicAccess.getClassFor[AnyRef](_).get) @@ -118,6 +147,63 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { private def writeInt(outputStream: OutputStream, i: Int) = 0 to 24 by 8 foreach { shift ⇒ outputStream.write(i >> shift) } - private def readInt(inputStream: InputStream) = - (0 to 24 by 8).foldLeft(0) { (id, shift) ⇒ (id | (inputStream.read() << shift)) } + private def readShort(inputStream: InputStream) = { + val ch1 = inputStream.read() + val ch2 = inputStream.read() + (ch2 << 8) | ch1 + } + + private def readInt(inputStream: InputStream) = { + val sh1 = readShort(inputStream) + val sh2 = readShort(inputStream) + (sh2 << 16) | sh1 + } } + +object SnapshotSerializer { + /* + * This is the serialized form of Class[Option[_]] (as a superclass, hence + * the leading 0x78) with Scala 2.10. + */ + val key: Array[Byte] = Array( + 0x78, 0x72, 0x00, 0x0c, 0x73, 0x63, 0x61, 0x6c, + 0x61, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0xe3, 0x60, 0x24, 0xa8, 0x32, 0x8a, 0x45, 0xe9, // here is the UID + 0x02, 0x00, 0x00, 0x78, 0x70).map(_.toByte) + // The offset of the serialVersionUID in the above. + val offset = 16 + // This is the new serialVersionUID from Scala 2.11 onwards. + val replacement: Array[Byte] = Array( + 0xfe, 0x69, 0x37, 0xfd, 0xdb, 0x0e, 0x66, 0x74).map(_.toByte) + val doPatch: Boolean = { + /* + * The only way to obtain the serialVersionUID for the Option[_] class is + * to serialize one and look at the result, since prior to 2.11.5 it does + * not declare such a field and relies on automatic UID generation. + */ + val baus = new ByteArrayOutputStream() + val out = new ObjectOutputStream(baus) + out.writeObject(None) + // the first 30 bytes serialize the None object, then comes its superclass Option[_] + val superOffset = 30 + val uidOffset = superOffset + offset + val clazz: Seq[Byte] = baus.toByteArray.slice(superOffset, uidOffset) + val knownClazz: Seq[Byte] = key.take(offset) + val uid: Seq[Byte] = baus.toByteArray.slice(uidOffset, uidOffset + replacement.length) + // only attempt to patch if we know the target class + if (clazz == knownClazz) { + if (uid == replacement.toSeq) { + // running on 2.11 + true + } else if (uid == (key.slice(offset, offset + replacement.length): Seq[Byte])) { + // running on 2.10, need to switch out UID between key and replacement + val len = replacement.length + val tmp = new Array[Byte](len) + System.arraycopy(replacement, 0, tmp, 0, len) + System.arraycopy(key, offset, replacement, 0, len) + System.arraycopy(tmp, 0, key, offset, len) + true + } else false + } else false + } +} \ No newline at end of file diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index 8c753639f0..d413bccbc6 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -71,7 +71,7 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) { deserialized should be(Snapshot(MySnapshot(".a."))) } - "be able to read snapshot created with akka 2.3.6" in { + "be able to read snapshot created with akka 2.3.6 and Scala 2.10" in { val dataStr = "abc" val snapshot = Snapshot(dataStr.getBytes("utf-8")) val serializer = serialization.findSerializerFor(snapshot) @@ -80,10 +80,37 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) { // for the SnapshotHeader. See issue #16009. // It was created with: // println(s"encoded snapshot: " + String.valueOf(encodeHex(serializer.toBinary(snapshot)))) - val oldSnapshot = "a8000000aced00057372002d616b6b612e70657273697374656e63652e73657269616c697a6174696f6e" + - "2e536e617073686f74486561646572000000000000000102000249000c73657269616c697a657249644c00086d616e696665" + - "737474000e4c7363616c612f4f7074696f6e3b7870000000047372000b7363616c612e4e6f6e6524465024f653ca94ac0200" + - "007872000c7363616c612e4f7074696f6ee36024a8328a45e90200007870616263" + val oldSnapshot = // 32 bytes per line + "a8000000aced00057372002d616b6b612e70657273697374656e63652e736572" + + "69616c697a6174696f6e2e536e617073686f7448656164657200000000000000" + + "0102000249000c73657269616c697a657249644c00086d616e69666573747400" + + "0e4c7363616c612f4f7074696f6e3b7870000000047372000b7363616c612e4e" + + "6f6e6524465024f653ca94ac0200007872000c7363616c612e4f7074696f6ee3" + + "6024a8328a45e90200007870616263" + + val bytes = decodeHex(oldSnapshot.toCharArray) + val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[Snapshot] + + val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], "utf-8") + dataStr should be(deserializedDataStr) + } + + "be able to read snapshot created with akka 2.3.6 and Scala 2.11" in { + val dataStr = "abc" + val snapshot = Snapshot(dataStr.getBytes("utf-8")) + val serializer = serialization.findSerializerFor(snapshot) + + // the oldSnapshot was created with Akka 2.3.6 and it is using JavaSerialization + // for the SnapshotHeader. See issue #16009. + // It was created with: + // println(s"encoded snapshot: " + String.valueOf(encodeHex(serializer.toBinary(snapshot)))) + val oldSnapshot = // 32 bytes per line + "a8000000aced00057372002d616b6b612e70657273697374656e63652e736572" + + "69616c697a6174696f6e2e536e617073686f7448656164657200000000000000" + + "0102000249000c73657269616c697a657249644c00086d616e69666573747400" + + "0e4c7363616c612f4f7074696f6e3b7870000000047372000b7363616c612e4e" + + "6f6e6524465024f653ca94ac0200007872000c7363616c612e4f7074696f6efe" + + "6937fddb0e66740200007870616263" val bytes = decodeHex(oldSnapshot.toCharArray) val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[Snapshot] diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c527b5b417..1eb46afdb5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -8,7 +8,7 @@ object Dependencies { import DependencyHelpers.ScalaVersionDependentModuleID._ object Versions { - val crossScala = Seq("2.10.4", "2.11.4") + val crossScala = Seq("2.11.5", "2.10.4") val scalaVersion = crossScala.head val scalaStmVersion = sys.props.get("akka.build.scalaStmVersion").getOrElse("0.7") val scalaTestVersion = sys.props.get("akka.build.scalaTestVersion").getOrElse("2.1.3")