diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index b2a9e7c85d..e43c075373 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -130,6 +130,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer if (persistent.persistenceId != Undefined) builder.setPersistenceId(persistent.persistenceId) if (persistent.sender != Actor.noSender) builder.setSender(Serialization.serializedActorPath(persistent.sender)) + if (persistent.manifest != PersistentRepr.Undefined) builder.setManifest(persistent.manifest) builder.setPayload(persistentPayloadBuilder(persistent.payload.asInstanceOf[AnyRef])) builder.setSequenceNr(persistent.sequenceNr) @@ -146,7 +147,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer serializer match { case ser2: SerializerWithStringManifest ⇒ val manifest = ser2.manifest(payload) - if (manifest != "") + if (manifest != PersistentRepr.Undefined) builder.setPayloadManifest(ByteString.copyFromUtf8(manifest)) case _ ⇒ if (serializer.includeManifest) diff --git a/akka-persistence/src/test/scala/akka/persistence/InmemEventAdapterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventAdapterSpec.scala similarity index 76% rename from akka-persistence/src/test/scala/akka/persistence/InmemEventAdapterSpec.scala rename to akka-persistence/src/test/scala/akka/persistence/EventAdapterSpec.scala index 52d5ee4bda..9595461d9d 100644 --- a/akka-persistence/src/test/scala/akka/persistence/InmemEventAdapterSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventAdapterSpec.scala @@ -5,15 +5,16 @@ package akka.persistence import akka.actor._ import akka.event.Logging +import akka.persistence.EventAdapterSpec.{ Tagged, UserDataChanged } import akka.persistence.journal.{ SingleEventSeq, EventSeq, EventAdapter } import akka.testkit.ImplicitSender import com.typesafe.config.{ Config, ConfigFactory } import scala.collection.immutable -object InmemEventAdapterSpec { +object EventAdapterSpec { - final val JournalModelClassName = classOf[InmemEventAdapterSpec].getCanonicalName + "$" + classOf[JournalModel].getSimpleName + final val JournalModelClassName = classOf[EventAdapterSpec].getCanonicalName + "$" + classOf[JournalModel].getSimpleName trait JournalModel { def payload: Any def tags: immutable.Set[String] @@ -23,7 +24,7 @@ object InmemEventAdapterSpec { override def tags = Set.empty } - final val DomainEventClassName = classOf[InmemEventAdapterSpec].getCanonicalName + "$" + classOf[DomainEvent].getSimpleName + final val DomainEventClassName = classOf[EventAdapterSpec].getCanonicalName + "$" + classOf[DomainEvent].getSimpleName trait DomainEvent final case class TaggedDataChanged(tags: immutable.Set[String], value: Int) extends DomainEvent final case class UserDataChanged(countryCode: String, age: Int) extends DomainEvent @@ -92,26 +93,26 @@ object InmemEventAdapterSpec { } -class InmemEventAdapterSpec(journalName: String, journalConfig: Config, adapterConfig: Config) +class EventAdapterSpec(journalName: String, journalConfig: Config, adapterConfig: Config) extends PersistenceSpec(journalConfig.withFallback(adapterConfig)) with ImplicitSender { - import InmemEventAdapterSpec._ + import EventAdapterSpec._ - def this() { + def this(journalName: String) { this("inmem", PersistenceSpec.config("inmem", "InmemPersistentTaggingSpec"), ConfigFactory.parseString( s""" |akka.persistence.journal { | | common-event-adapters { - | age = "${classOf[InmemEventAdapterSpec].getCanonicalName}$$UserAgeTaggingAdapter" - | replay-pass-through = "${classOf[InmemEventAdapterSpec].getCanonicalName}$$ReplayPassThroughAdapter" + | age = "${classOf[EventAdapterSpec].getCanonicalName}$$UserAgeTaggingAdapter" + | replay-pass-through = "${classOf[EventAdapterSpec].getCanonicalName}$$ReplayPassThroughAdapter" | } | | inmem { | event-adapters = $${akka.persistence.journal.common-event-adapters} | event-adapter-bindings { - | "${InmemEventAdapterSpec.DomainEventClassName}" = age - | "${InmemEventAdapterSpec.JournalModelClassName}" = age + | "${EventAdapterSpec.DomainEventClassName}" = age + | "${EventAdapterSpec.JournalModelClassName}" = age | } | } | @@ -120,7 +121,7 @@ class InmemEventAdapterSpec(journalName: String, journalConfig: Config, adapterC | dir = "journal-1" | | event-adapters { - | logging = "${classOf[InmemEventAdapterSpec].getCanonicalName}$$LoggingAdapter" + | logging = "${classOf[EventAdapterSpec].getCanonicalName}$$LoggingAdapter" | } | event-adapter-bindings { | "java.lang.Object" = logging @@ -133,8 +134,8 @@ class InmemEventAdapterSpec(journalName: String, journalConfig: Config, adapterC | | event-adapters = $${akka.persistence.journal.common-event-adapters} | event-adapter-bindings { - | "${InmemEventAdapterSpec.JournalModelClassName}" = replay-pass-through - | "${InmemEventAdapterSpec.DomainEventClassName}" = replay-pass-through + | "${EventAdapterSpec.JournalModelClassName}" = replay-pass-through + | "${EventAdapterSpec.DomainEventClassName}" = replay-pass-through | } | } | @@ -175,6 +176,12 @@ class InmemEventAdapterSpec(journalName: String, journalConfig: Config, adapterC toJournal(event, "with-actor-system") should equal(event) fromJournal(event, "with-actor-system") should equal(SingleEventSeq(event)) } + } + +} + +trait ReplayPassThrough { this: EventAdapterSpec ⇒ + "EventAdapter" must { "store events after applying adapter" in { val replayPassThroughJournalId = "replay-pass-through-adapter-journal" @@ -196,7 +203,12 @@ class InmemEventAdapterSpec(journalName: String, journalConfig: Config, adapterC expectMsg(Tagged(m1, Set("adult"))) expectMsg(m2) } + } +} + +trait NoAdapters { this: EventAdapterSpec ⇒ + "EventAdapter" must { "work when plugin defines no adapter" in { val p2 = persister("p2", journalId = "no-adapter") val m1 = UserDataChanged("name", 64) @@ -215,6 +227,15 @@ class InmemEventAdapterSpec(journalName: String, journalConfig: Config, adapterC expectMsg(m1) expectMsg(m2) } - } + } } + +// this style of testing allows us to try different leveldb journal plugin configurations +// because it always would use the same leveldb directory anyway (based on class name), +// yet we need different instances of the plugin. For inmem it does not matter, it can survive many instances. +class InmemEventAdapterSpec extends EventAdapterSpec("inmem") with ReplayPassThrough with NoAdapters + +class LeveldbBaseEventAdapterSpec extends EventAdapterSpec("leveldb") +class LeveldbReplayPassThroughEventAdapterSpec extends EventAdapterSpec("leveldb") with ReplayPassThrough +class LeveldbNoAdaptersEventAdapterSpec extends EventAdapterSpec("leveldb") with NoAdapters diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/EventAdaptersSpec.scala b/akka-persistence/src/test/scala/akka/persistence/journal/InmemEventAdaptersSpec.scala similarity index 99% rename from akka-persistence/src/test/scala/akka/persistence/journal/EventAdaptersSpec.scala rename to akka-persistence/src/test/scala/akka/persistence/journal/InmemEventAdaptersSpec.scala index 814357791c..8e8e0d78b4 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/EventAdaptersSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/InmemEventAdaptersSpec.scala @@ -8,7 +8,7 @@ import akka.actor.ExtendedActorSystem import akka.testkit.AkkaSpec import com.typesafe.config.ConfigFactory -class EventAdaptersSpec extends AkkaSpec { +class InmemEventAdaptersSpec extends AkkaSpec { val config = ConfigFactory.parseString( s""" diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index 58bd94c7ba..0d4a61dcd1 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -304,6 +304,8 @@ class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customS val remoteSystem = ActorSystem("remote", remote.withFallback(customSerializers)) val localActor = system.actorOf(Props(classOf[LocalActor], port(remoteSystem)), "local") + val serialization = SerializationExtension(system) + override protected def atStartup() { remoteSystem.actorOf(Props[RemoteActor], "remote") } @@ -328,6 +330,13 @@ class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customS expectMsg("p.a.") expectMsg("p.b.") } + + "serialize manifest provided by EventAdapter" in { + val p1 = PersistentRepr(MyPayload("a"), sender = testActor).withManifest("manifest") + val bytes = serialization.serialize(p1).get + val back = serialization.deserialize(bytes, classOf[PersistentRepr]).get + require(p1.manifest == back.manifest) + } } }