diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala index 741b3dfe46..ca9c627fab 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala @@ -9,6 +9,8 @@ import java.io._ import akka.actor._ import akka.serialization.{ Serializer, SerializationExtension } import akka.serialization.Serialization +import scala.util.Success +import scala.util.Failure /** * Wrapper for snapshot `data`. Snapshot `data` are the actual snapshot objects captured by @@ -59,11 +61,12 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { val extension = SerializationExtension(system) val snapshotSerializer = extension.findSerializerFor(snapshot) - val snapshotManifest = if (snapshotSerializer.includeManifest) Some(snapshot.getClass.getName) else None - val header = SnapshotHeader(snapshotSerializer.identifier, snapshotManifest) - val headerSerializer = extension.findSerializerFor(header) - val headerBytes = headerSerializer.toBinary(header) + val headerOut = new ByteArrayOutputStream + writeInt(headerOut, snapshotSerializer.identifier) + if (snapshotSerializer.includeManifest) + headerOut.write(snapshot.getClass.getName.getBytes("utf-8")) + val headerBytes = headerOut.toByteArray val out = new ByteArrayOutputStream @@ -74,7 +77,7 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { out.toByteArray } - // serialize actor references with full address information (defaultAddress) + // serialize actor references with full address information (defaultAddress) transportInformation match { case Some(ti) ⇒ Serialization.currentTransportInformation.withValue(ti) { serialize() } case None ⇒ serialize() @@ -88,7 +91,25 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { val headerBytes = bytes.slice(4, headerLength + 4) val snapshotBytes = bytes.drop(headerLength + 4) - val header = extension.deserialize(headerBytes, classOf[SnapshotHeader]).get + // If we are allowed to break serialization compatibility of stored snapshots in 2.4 + // we can remove this attempt of deserialize SnapshotHeader with JavaSerializer. + // Then the class SnapshotHeader can be removed. See isseue #16009 + val header = extension.deserialize(headerBytes, classOf[SnapshotHeader]) match { + case Success(header) ⇒ + header + case Failure(_) ⇒ + val headerIn = new ByteArrayInputStream(headerBytes) + val serializerId = readInt(headerIn) + val remaining = headerIn.available + val manifest = + if (remaining == 0) None + else { + val manifestBytes = Array.ofDim[Byte](remaining) + headerIn.read(manifestBytes) + Some(new String(manifestBytes, "utf-8")) + } + SnapshotHeader(serializerId, manifest) + } val manifest = header.manifest.map(system.dynamicAccess.getClassFor[AnyRef](_).get) extension.deserialize[AnyRef](snapshotBytes, header.serializerId, manifest).get diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index 7fd31e2875..09680172ef 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -15,6 +15,7 @@ import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery import scala.concurrent.Await import scala.concurrent.duration.Duration +import org.apache.commons.codec.binary.Hex.decodeHex object SerializerSpecConfigs { val customSerializers = ConfigFactory.parseString( @@ -69,6 +70,27 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) { deserialized should be(Snapshot(MySnapshot(".a."))) } + + "be able to read snapshot created with akka 2.3.6" in { + val dataStr = "abc" + val snapshot = Snapshot(dataStr.getBytes("utf-8")) + val serializer = serialization.findSerializerFor(snapshot) + + // the oldSnapshot was created with Akka 2.3.6 and it is using JavaSerialization + // for the SnapshotHeader. See issue #16009. + // It was created with: + // println(s"encoded snapshot: " + String.valueOf(encodeHex(serializer.toBinary(snapshot)))) + val oldSnapshot = "a8000000aced00057372002d616b6b612e70657273697374656e63652e73657269616c697a6174696f6e" + + "2e536e617073686f74486561646572000000000000000102000249000c73657269616c697a657249644c00086d616e696665" + + "737474000e4c7363616c612f4f7074696f6e3b7870000000047372000b7363616c612e4e6f6e6524465024f653ca94ac0200" + + "007872000c7363616c612e4f7074696f6ee36024a8328a45e90200007870616263" + + val bytes = decodeHex(oldSnapshot.toCharArray) + val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[Snapshot] + + val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], "utf-8") + dataStr should be(deserializedDataStr) + } } } @@ -270,4 +292,4 @@ class MySnapshotSerializer extends Serializer { case Some(c) ⇒ throw new Exception(s"unexpected manifest ${c}") case None ⇒ throw new Exception("no manifest") } -} \ No newline at end of file +}