From 80c68af14cfa313ce953d08e72375be5eb0028ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bjo=CC=88rn=20Antonsson?= Date: Mon, 26 May 2014 13:55:13 +0200 Subject: [PATCH] =per #15280 Make snapshot header length field deserialize correctly --- .../serialization/SnapshotSerializer.scala | 2 +- .../akka/persistence/PersistenceSpec.scala | 8 +- .../SnapshotSerializationSpec.scala | 93 +++++++++++++++++++ 3 files changed, 99 insertions(+), 4 deletions(-) create mode 100644 akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala index d2ffb0637b..741b3dfe46 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala @@ -98,5 +98,5 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { 0 to 24 by 8 foreach { shift ⇒ outputStream.write(i >> shift) } private def readInt(inputStream: InputStream) = - (0 to 24 by 8).foldLeft(0) { (id, shift) ⇒ (id | (inputStream.read() >> shift)) } + (0 to 24 by 8).foldLeft(0) { (id, shift) ⇒ (id | (inputStream.read() << shift)) } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index 89376a63e4..66e32c8707 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -46,8 +46,10 @@ trait PersistenceSpec extends BeforeAndAfterEach with Cleanup { this: AkkaSpec } object PersistenceSpec { - def config(plugin: String, test: String, serialization: String = "on") = ConfigFactory.parseString( - s""" + def config(plugin: String, test: String, serialization: String = "on", extraConfig: Option[String] = None) = + extraConfig.map(ConfigFactory.parseString(_)).getOrElse(ConfigFactory.empty()).withFallback( + ConfigFactory.parseString( + s""" akka.actor.serialize-creators = ${serialization} akka.actor.serialize-messages = ${serialization} akka.persistence.publish-confirmations = on @@ -56,7 +58,7 @@ object PersistenceSpec { akka.persistence.journal.leveldb.dir = "target/journal-${test}" akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}/" akka.test.single-expect-default = 10s - """) + """)) } trait Cleanup { this: AkkaSpec ⇒ diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala new file mode 100644 index 0000000000..7475f096e0 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.persistence + +import akka.actor.{ Props, ActorRef } +import akka.serialization.Serializer +import akka.testkit.{ ImplicitSender, AkkaSpec } +import java.io._ + +object SnapshotSerializationSpec { + trait SerializationMarker + + // The FQN of the MySnapshot class needs to be long so that the total snapshot header length + // is bigger than 255 bytes (this happens to be 269) + object XXXXXXXXXXXXXXXXXXXX { + class MySnapshot(val id: String) extends SerializationMarker { + override def equals(obj: scala.Any) = obj match { + case s: MySnapshot ⇒ s.id.equals(id) + case _ ⇒ false + } + } + } + + import XXXXXXXXXXXXXXXXXXXX._ + + class MySerializer extends Serializer { + def includeManifest: Boolean = true + def identifier = 5177 + + def toBinary(obj: AnyRef): Array[Byte] = { + val bStream = new ByteArrayOutputStream() + val pStream = new PrintStream(bStream) + val msg: String = obj match { + case s: MySnapshot ⇒ s.id + case _ ⇒ "unknown" + } + pStream.println(msg) + bStream.toByteArray + } + + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { + val bStream = new ByteArrayInputStream(bytes) + val reader = new BufferedReader(new InputStreamReader(bStream)) + new MySnapshot(reader.readLine()) + } + } + + class TestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) { + def receive = { + case s: String ⇒ saveSnapshot(new MySnapshot(s)) + case SaveSnapshotSuccess(md) ⇒ probe ! md.sequenceNr + case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) + case other ⇒ probe ! other + } + } + +} + +class SnapshotSerializationSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotSerializationSpec", serialization = "off", extraConfig = Some( + """ + |akka.logelevel = debug + |akka.actor { + | serializers { + | my-snapshot = "akka.persistence.SnapshotSerializationSpec$MySerializer" + | } + | serialization-bindings { + | "akka.persistence.SnapshotSerializationSpec$SerializationMarker" = my-snapshot + | } + |} + """.stripMargin))) with PersistenceSpec with ImplicitSender { + + import SnapshotSerializationSpec._ + import SnapshotSerializationSpec.XXXXXXXXXXXXXXXXXXXX._ + + "A processor with custom Serializer" must { + "be able to handle serialization header of more than 255 bytes" in { + val sProcessor = system.actorOf(Props(classOf[TestProcessor], name, testActor)) + val processorId = name + + sProcessor ! "blahonga" + expectMsg(0) + val lProcessor = system.actorOf(Props(classOf[TestProcessor], name, testActor)) + lProcessor ! Recover() + expectMsgPF() { + case (SnapshotMetadata(`processorId`, 0, timestamp), state) ⇒ + state should be(new MySnapshot("blahonga")) + timestamp should be > (0L) + } + } + } +}