diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceEventAdapterDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceEventAdapterDocTest.java new file mode 100644 index 0000000000..b373625b41 --- /dev/null +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceEventAdapterDocTest.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package docs.persistence; + +import akka.persistence.journal.EventAdapter; +import akka.persistence.journal.EventSeq; + +public class PersistenceEventAdapterDocTest { + + @SuppressWarnings("unused") + static + //#identity-event-adapter + class MyEventAdapter extends EventAdapter { + @Override + public String manifest(Object event) { + return ""; // if no manifest needed, return "" + } + + @Override + public Object toJournal(Object event) { + return event; // identity + } + + @Override + public EventSeq fromJournal(Object event, String manifest) { + return EventSeq.single(event); // identity + } + } + //#identity-event-adapter +} \ No newline at end of file diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 2f48d5648b..795fb3105e 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -256,7 +256,7 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e .. _defer-java: Deferring actions until preceding persist handlers have executed ------------------------------------------------------------------ +---------------------------------------------------------------- Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of ''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method @@ -503,6 +503,54 @@ not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedM The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages`` configuration key. The method can be overridden by implementation classes to return non-default values. +.. _event-adapters-java: + +Event Adapters +============== + +.. note:: + + Complete documentation featuring use-cases and implementation examples for this feature will follow shortly. + +In long running projects using event sourcing sometimes the need arrises to detach the data model from the domain model +completely. + +Event Adapters help in situations where: + +- **Version Migration** – existing events stored in *Version 1* should be "upcasted" to a new *Version 2* representation, + and the process of doing so involves actual code, not just changes on the serialization layer. For these scenarios + the ``toJournal`` function is usually an identity function, however the ``fromJournal`` is implemented as + ``v1.Event=>v2.Event``, performing the neccessary mapping inside the fromJournal method. + This technique is sometimes refered to as "upcasting" in other CQRS libraries. +- **Separating Domain and Data models** – thanks to EventAdapters it is possible to completely separate the domain model + from the model used to persist data in the Journals. For example one may want to use case classes in the + domain model, however persist their protocol-buffer (or any other binary serialization format) counter-parts to the Journal. + A simple ``toJournal:MyModel=>MyDataModel`` and ``fromJournal:MyDataModel=>MyModel`` adapter can be used to implement this feature. +- **Journal Specialized Data Types** – exposing data types understood by the underlying Journal, for example for data stores which + understand JSON it is possible to write an EventAdapter ``toJournal:Any=>JSON`` such that the Journal can *directly* store the + json instead of serializing the object to its binary representation. + +Implementing an EventAdapter is rather stright forward: + +.. includecode:: code/docs/persistence/PersistenceEventAdapterDocTest.java#identity-event-adapter + +Then in order for it to be used on events coming to and from the journal you must bind it using the below configuration syntax: + +.. includecode:: ../scala/code/docs/persistence/PersistenceEventAdapterDocSpec.scala#event-adapters-config + +It is possible to bind multiple adapters to one class *for recovery*, in which case the ``fromJournal`` methods of all +bound adapters will be applied to a given matching event (in order of definition in the configuration). Since each adapter may +return from ``0`` to ``n`` adapted events (called as ``EventSeq``), each adapter can investigate the event and if it should +indeed adapt it return the adapted event(s) for it, other adapters which do not have anything to contribute during this +adaptation simply return ``EventSeq.empty``. The adapted events are then delivered in-order to the ``PersistentActor`` during replay. + +.. note:: + More advanced techniques utilising advanced binary serialization formats such as protocol buffers or kryo / thrift / avro + will be documented very soon. These schema evolutions often may need to reach into the serialization layer, however + are much more powerful in terms of flexibly removing unused/deprecated classes from your classpath etc. + + + Storage plugins =============== diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceEventAdapterDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceEventAdapterDocSpec.scala new file mode 100644 index 0000000000..77e8eb041a --- /dev/null +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceEventAdapterDocSpec.scala @@ -0,0 +1,233 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package docs.persistence + +import akka.actor.{ ExtendedActorSystem, Props } +import akka.persistence.journal.{ EventAdapter, EventSeq } +import akka.persistence.{ PersistentActor, RecoveryCompleted } +import akka.testkit.{ AkkaSpec, TestProbe } +import com.google.gson.{ Gson, JsonElement } + +import scala.collection.immutable + +class PersistenceEventAdapterDocSpec(config: String) extends AkkaSpec(config) { + + def this() { + this(""" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + + //#event-adapters-config + akka.persistence.journal { + inmem { + event-adapters { + tagging = "docs.persistence.MyTaggingEventAdapter" + user-upcasting = "docs.persistence.UserUpcastingEventAdapter" + item-upcasting = "docs.persistence.ItemUpcastingEventAdapter" + } + + event-adapter-bindings { + "docs.persistence.Item" = tagging + "docs.persistence.TaggedEvent" = tagging + "docs.persistence.v1.Event" = [user-upcasting, item-upcasting] + } + } + } + //#event-adapters-config + + + akka.persistence.journal { + auto-json-store { + class = "akka.persistence.journal.inmem.InmemJournal" # reuse inmem, as an example + + event-adapters { + auto-json = "docs.persistence.MyAutoJsonEventAdapter" + } + + event-adapter-bindings { + "docs.persistence.DomainEvent" = auto-json # to journal + "com.google.gson.JsonElement" = auto-json # from journal + } + } + + manual-json-store { + class = "akka.persistence.journal.inmem.InmemJournal" # reuse inmem, as an example + + event-adapters { + manual-json = "docs.persistence.MyManualJsonEventAdapter" + } + + event-adapter-bindings { + "docs.persistence.DomainEvent" = manual-json # to journal + "com.google.gson.JsonElement" = manual-json # from journal + } + } + } + """) + } + + "MyAutomaticJsonEventAdapter" must { + "demonstrate how to implement a JSON adapter" in { + val p = TestProbe() + + val props = Props(new PersistentActor { + override def persistenceId: String = "json-actor" + override def journalPluginId: String = "akka.persistence.journal.auto-json-store" + + override def receiveRecover: Receive = { + case RecoveryCompleted => // ignore... + case e => p.ref ! e + } + + override def receiveCommand: Receive = { + case c => persist(c) { e => p.ref ! e } + } + }) + + val p1 = system.actorOf(props) + val m1 = Person("Caplin", 42) + val m2 = Box(13) + p1 ! m1 + p1 ! m2 + p.expectMsg(m1) + p.expectMsg(m2) + + val p2 = system.actorOf(props) + p.expectMsg(m1) + p.expectMsg(m2) + } + } + + "MyManualJsonEventAdapter" must { + "demonstrate how to implement a JSON adapter" in { + val p = TestProbe() + + val props = Props(new PersistentActor { + override def persistenceId: String = "json-actor" + override def journalPluginId: String = "akka.persistence.journal.manual-json-store" + + override def receiveRecover: Receive = { + case RecoveryCompleted => // ignore... + case e => p.ref ! e + } + + override def receiveCommand: Receive = { + case c => persist(c) { e => p.ref ! e } + } + }) + + val p1 = system.actorOf(props) + val m1 = Person("Caplin", 42) + val m2 = Box(13) + p1 ! m1 + p1 ! m2 + p.expectMsg(m1) + p.expectMsg(m2) + + val p2 = system.actorOf(props) + p.expectMsg(m1) + p.expectMsg(m2) + } + } +} + +trait DomainEvent +case class Person(name: String, age: Int) extends DomainEvent +case class Box(length: Int) extends DomainEvent + +case class MyTaggingJournalModel(payload: Any, tags: immutable.Set[String]) + +//#identity-event-adapter +class MyEventAdapter(system: ExtendedActorSystem) extends EventAdapter { + override def manifest(event: Any): String = + "" // when no manifest needed, return "" + + override def toJournal(event: Any): Any = + event // identity + + override def fromJournal(event: Any, manifest: String): EventSeq = + EventSeq.single(event) // identity +} +//#identity-event-adapter + +/** + * This is an example adapter which completely takes care of domain<->json translation. + * It allows the journal to take care of the manifest handling, which is the FQN of the serialized class. + */ +class MyAutoJsonEventAdapter(system: ExtendedActorSystem) extends EventAdapter { + private val gson = new Gson + + override def manifest(event: Any): String = + event.getClass.getCanonicalName + + override def toJournal(event: Any): Any = gson.toJsonTree(event) + + override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single { + event match { + case json: JsonElement => + val clazz = system.dynamicAccess.getClassFor[Any](manifest).get + gson.fromJson(json, clazz) + } + } +} + +class MyUpcastingEventAdapter(system: ExtendedActorSystem) extends EventAdapter { + override def manifest(event: Any): String = "" + + override def toJournal(event: Any): Any = ??? + + override def fromJournal(event: Any, manifest: String): EventSeq = ??? +} + +/** + * This is an example adapter which completely takes care of domain<->json translation. + * It manually takes care of smuggling through the manifest even if the journal does not do anything in this regard, + * which is the case in this example since we're persisting to the inmem journal, which does nothing in terms of manifest. + */ +class MyManualJsonEventAdapter(system: ExtendedActorSystem) extends EventAdapter { + + private val gson = new Gson + + override def manifest(event: Any): String = event.getClass.getCanonicalName + + override def toJournal(event: Any): Any = { + val out = gson.toJsonTree(event).getAsJsonObject + + // optionally can include manifest in the adapted event. + // some journals will store the manifest transparently + out.addProperty("_manifest", manifest(event)) + + out + } + + override def fromJournal(event: Any, m: String): EventSeq = event match { + case json: JsonElement => + val manifest = json.getAsJsonObject.get("_manifest").getAsString + + val clazz = system.dynamicAccess.getClassFor[Any](manifest).get + EventSeq.single(gson.fromJson(json, clazz)) + } +} + +class MyTaggingEventAdapter(system: ExtendedActorSystem) extends EventAdapter { + override def manifest(event: Any): String = "" + + override def fromJournal(event: Any, manifest: String): EventSeq = event match { + case j: MyTaggingJournalModel => EventSeq.single(j) + } + + override def toJournal(event: Any): Any = { + event match { + case Person(_, age) if age >= 18 => MyTaggingJournalModel(event, tags = Set("adult")) + case Person(_, age) => MyTaggingJournalModel(event, tags = Set("minor")) + case _ => MyTaggingJournalModel(event, tags = Set.empty) + } + } +} + +object v1 { + trait Event + trait UserEvent extends v1.Event + trait ItemEvent extends v1.Event +} \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index b0e9a2cfa8..eb8479536b 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -92,6 +92,9 @@ object SharedLeveldbPluginDocSpec { //#shared-store-config akka.persistence.journal.leveldb-shared.store.dir = "target/shared" //#shared-store-config + //#event-adapter-config + akka.persistence.journal.leveldb-shared.adapter = "com.example.MyAdapter" + //#event-adapter-config """ //#shared-store-usage diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 3b29dea7ea..b25bbec4b4 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -494,6 +494,54 @@ not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedM The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages`` configuration key. The method can be overridden by implementation classes to return non-default values. +.. _event-adapters-scala: + +Event Adapters +============== + +.. note:: + + Complete documentation featuring use-cases and implementation examples for this feature will follow shortly. + +In long running projects using event sourcing sometimes the need arrises to detach the data model from the domain model +completely. + +Event Adapters help in situations where: + +- **Version Migrations** – existing events stored in *Version 1* should be "upcasted" to a new *Version 2* representation, + and the process of doing so involves actual code, not just changes on the serialization layer. For these scenarios + the ``toJournal`` function is usually an identity function, however the ``fromJournal`` is implemented as + ``v1.Event=>v2.Event``, performing the neccessary mapping inside the fromJournal method. + This technique is sometimes refered to as "upcasting" in other CQRS libraries. +- **Separating Domain and Data models** – thanks to EventAdapters it is possible to completely separate the domain model + from the model used to persist data in the Journals. For example one may want to use case classes in the + domain model, however persist their protocol-buffer (or any other binary serialization format) counter-parts to the Journal. + A simple ``toJournal:MyModel=>MyDataModel`` and ``fromJournal:MyDataModel=>MyModel`` adapter can be used to implement this feature. +- **Journal Specialized Data Types** – exposing data types understood by the underlying Journal, for example for data stores which + understand JSON it is possible to write an EventAdapter ``toJournal:Any=>JSON`` such that the Journal can *directly* store the + json instead of serializing the object to its binary representation. + + +Implementing an EventAdapter is rather stright forward: + +.. includecode:: code/docs/persistence/PersistenceEventAdapterDocSpec.scala#identity-event-adapter + +Then in order for it to be used on events coming to and from the journal you must bind it using the below configuration syntax: + +.. includecode:: code/docs/persistence/PersistenceEventAdapterDocSpec.scala#event-adapters-config + +It is possible to bind multiple adapters to one class *for recovery*, in which case the ``fromJournal`` methods of all +bound adapters will be applied to a given matching event (in order of definition in the configuration). Since each adapter may +return from ``0`` to ``n`` adapted events (called as ``EventSeq``), each adapter can investigate the event and if it should +indeed adapt it return the adapted event(s) for it, other adapters which do not have anything to contribute during this +adaptation simply return ``EventSeq.empty``. The adapted events are then delivered in-order to the ``PersistentActor`` during replay. + +.. note:: + More advanced techniques utilising advanced binary serialization formats such as protocol buffers or kryo / thrift / avro + will be documented very soon. These schema evolutions often may need to reach into the serialization layer, however + are much more powerful in terms of flexibly removing unused/deprecated classes from your classpath etc. + + .. _persistent-fsm: Persistent FSM diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index 01fc995bc6..d661389dc0 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -46,7 +46,7 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { extension.journalFor(null) def replayedMessage(snr: Long, deleted: Boolean = false, confirms: Seq[String] = Nil): ReplayedMessage = - ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, deleted, Actor.noSender)) + ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, "", deleted, Actor.noSender)) def writeMessages(from: Int, to: Int, pid: String, sender: ActorRef): Unit = { val msgs = from to to map { i ⇒ PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender) } @@ -56,7 +56,7 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { probe.expectMsg(WriteMessagesSuccessful) from to to foreach { i ⇒ - probe.expectMsgPF() { case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, `sender`), _) ⇒ payload should be(s"a-${i}") } + probe.expectMsgPF() { case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`), _) ⇒ payload should be(s"a-${i}") } } } diff --git a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java index bd7be6de1b..03343f525d 100644 --- a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java +++ b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java @@ -98,6 +98,21 @@ public final class MessageFormats { */ com.google.protobuf.ByteString getSenderBytes(); + + // optional string manifest = 12; + /** + * optional string manifest = 12; + */ + boolean hasManifest(); + /** + * optional string manifest = 12; + */ + java.lang.String getManifest(); + /** + * optional string manifest = 12; + */ + com.google.protobuf.ByteString + getManifestBytes(); } /** * Protobuf type {@code PersistentMessage} @@ -183,6 +198,11 @@ public final class MessageFormats { sender_ = input.readBytes(); break; } + case 98: { + bitField0_ |= 0x00000020; + manifest_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -387,12 +407,56 @@ public final class MessageFormats { } } + // optional string manifest = 12; + public static final int MANIFEST_FIELD_NUMBER = 12; + private java.lang.Object manifest_; + /** + * optional string manifest = 12; + */ + public boolean hasManifest() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional string manifest = 12; + */ + public java.lang.String getManifest() { + java.lang.Object ref = manifest_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + manifest_ = s; + } + return s; + } + } + /** + * optional string manifest = 12; + */ + public com.google.protobuf.ByteString + getManifestBytes() { + java.lang.Object ref = manifest_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + manifest_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { payload_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance(); sequenceNr_ = 0L; persistenceId_ = ""; deleted_ = false; sender_ = ""; + manifest_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -427,6 +491,9 @@ public final class MessageFormats { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeBytes(11, getSenderBytes()); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeBytes(12, getManifestBytes()); + } getUnknownFields().writeTo(output); } @@ -456,6 +523,10 @@ public final class MessageFormats { size += com.google.protobuf.CodedOutputStream .computeBytesSize(11, getSenderBytes()); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(12, getManifestBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -587,6 +658,8 @@ public final class MessageFormats { bitField0_ = (bitField0_ & ~0x00000008); sender_ = ""; bitField0_ = (bitField0_ & ~0x00000010); + manifest_ = ""; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -639,6 +712,10 @@ public final class MessageFormats { to_bitField0_ |= 0x00000010; } result.sender_ = sender_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.manifest_ = manifest_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -674,6 +751,11 @@ public final class MessageFormats { sender_ = other.sender_; onChanged(); } + if (other.hasManifest()) { + bitField0_ |= 0x00000020; + manifest_ = other.manifest_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1086,6 +1168,80 @@ public final class MessageFormats { return this; } + // optional string manifest = 12; + private java.lang.Object manifest_ = ""; + /** + * optional string manifest = 12; + */ + public boolean hasManifest() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional string manifest = 12; + */ + public java.lang.String getManifest() { + java.lang.Object ref = manifest_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + manifest_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string manifest = 12; + */ + public com.google.protobuf.ByteString + getManifestBytes() { + java.lang.Object ref = manifest_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + manifest_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string manifest = 12; + */ + public Builder setManifest( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000020; + manifest_ = value; + onChanged(); + return this; + } + /** + * optional string manifest = 12; + */ + public Builder clearManifest() { + bitField0_ = (bitField0_ & ~0x00000020); + manifest_ = getDefaultInstance().getManifest(); + onChanged(); + return this; + } + /** + * optional string manifest = 12; + */ + public Builder setManifestBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000020; + manifest_ = value; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:PersistentMessage) } @@ -3900,21 +4056,22 @@ public final class MessageFormats { descriptor; static { java.lang.String[] descriptorData = { - "\n\024MessageFormats.proto\"\204\001\n\021PersistentMes" + + "\n\024MessageFormats.proto\"\226\001\n\021PersistentMes" + "sage\022#\n\007payload\030\001 \001(\0132\022.PersistentPayloa" + "d\022\022\n\nsequenceNr\030\002 \001(\003\022\025\n\rpersistenceId\030\003" + - " \001(\t\022\017\n\007deleted\030\004 \001(\010\022\016\n\006sender\030\013 \001(\t\"S\n" + - "\021PersistentPayload\022\024\n\014serializerId\030\001 \002(\005" + - "\022\017\n\007payload\030\002 \002(\014\022\027\n\017payloadManifest\030\003 \001" + - "(\014\"\356\001\n\033AtLeastOnceDeliverySnapshot\022\031\n\021cu" + - "rrentDeliveryId\030\001 \002(\003\022O\n\025unconfirmedDeli" + - "veries\030\002 \003(\01320.AtLeastOnceDeliverySnapsh" + - "ot.UnconfirmedDelivery\032c\n\023UnconfirmedDel", - "ivery\022\022\n\ndeliveryId\030\001 \002(\003\022\023\n\013destination" + - "\030\002 \002(\t\022#\n\007payload\030\003 \002(\0132\022.PersistentPayl" + - "oad\"F\n\032PersistentStateChangeEvent\022\027\n\017sta" + - "teIdentifier\030\001 \002(\t\022\017\n\007timeout\030\002 \001(\tB\"\n\036a" + - "kka.persistence.serializationH\001" + " \001(\t\022\017\n\007deleted\030\004 \001(\010\022\016\n\006sender\030\013 \001(\t\022\020\n" + + "\010manifest\030\014 \001(\t\"S\n\021PersistentPayload\022\024\n\014" + + "serializerId\030\001 \002(\005\022\017\n\007payload\030\002 \002(\014\022\027\n\017p" + + "ayloadManifest\030\003 \001(\014\"\356\001\n\033AtLeastOnceDeli" + + "verySnapshot\022\031\n\021currentDeliveryId\030\001 \002(\003\022" + + "O\n\025unconfirmedDeliveries\030\002 \003(\01320.AtLeast" + + "OnceDeliverySnapshot.UnconfirmedDelivery", + "\032c\n\023UnconfirmedDelivery\022\022\n\ndeliveryId\030\001 " + + "\002(\003\022\023\n\013destination\030\002 \002(\t\022#\n\007payload\030\003 \002(" + + "\0132\022.PersistentPayload\"F\n\032PersistentState" + + "ChangeEvent\022\027\n\017stateIdentifier\030\001 \002(\t\022\017\n\007" + + "timeout\030\002 \001(\tB\"\n\036akka.persistence.serial" + + "izationH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3926,7 +4083,7 @@ public final class MessageFormats { internal_static_PersistentMessage_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_PersistentMessage_descriptor, - new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Sender", }); + new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Sender", "Manifest", }); internal_static_PersistentPayload_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_PersistentPayload_fieldAccessorTable = new diff --git a/akka-persistence/src/main/protobuf/MessageFormats.proto b/akka-persistence/src/main/protobuf/MessageFormats.proto index b286b56711..32fec171c0 100644 --- a/akka-persistence/src/main/protobuf/MessageFormats.proto +++ b/akka-persistence/src/main/protobuf/MessageFormats.proto @@ -16,6 +16,7 @@ message PersistentMessage { // optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 // optional string confirmTarget = 10; // Removed in 2.4 optional string sender = 11; // not stored in journal, needed for remote serialization + optional string manifest = 12; } message PersistentPayload { diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 8f460c4c17..27e9176f01 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -4,10 +4,10 @@ package akka.persistence -import scala.collection.immutable - import akka.actor._ +import scala.collection.immutable + /** * INTERNAL API. * diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 9001692bbf..9351fe9cfd 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -4,16 +4,17 @@ package akka.persistence -import scala.concurrent.duration._ -import com.typesafe.config.Config +import java.util.concurrent.atomic.AtomicReference + import akka.actor._ import akka.dispatch.Dispatchers -import akka.persistence.journal.AsyncWriteJournal +import akka.event.{ Logging, LoggingAdapter } +import akka.persistence.journal.{ AsyncWriteJournal, EventAdapters, IdentityEventAdapters } import akka.util.Helpers.ConfigOps -import akka.event.LoggingAdapter -import akka.event.Logging -import java.util.concurrent.atomic.AtomicReference +import com.typesafe.config.Config + import scala.annotation.tailrec +import scala.concurrent.duration._ /** * Persistence configuration. @@ -110,7 +111,7 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system) def lookup() = Persistence /** INTERNAL API. */ - private[persistence] case class PluginHolder(actor: ActorRef) extends Extension + private[persistence] case class PluginHolder(actor: ActorRef, adapters: EventAdapters) extends Extension } /** @@ -142,7 +143,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { val settings = new PersistenceSettings(config) private def journalDispatchSelector(klaz: Class[_]): String = - if (classOf[AsyncWriteJournal].isAssignableFrom(klaz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId + if (classOf[AsyncWriteJournal].isAssignableFrom(klaz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId // TODO sure this is not inverted? private def snapshotDispatchSelector(klaz: Class[_]): String = DefaultPluginDispatcherId @@ -156,6 +157,43 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { /** Discovered persistence snapshot store plugins. */ private val snapshotPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) + /** + * Returns an [[EventAdapters]] object which serves as a per-journal collection of bound event adapters. + * If no adapters are registered for a given journal the EventAdapters object will simply return the identity + * adapter for each class, otherwise the most specific adapter matching a given class will be returned. + */ + @tailrec final def adaptersFor(journalPluginId: String): EventAdapters = { + val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId + val extensionIdMap = journalPluginExtensionId.get + extensionIdMap.get(configPath) match { + case Some(extensionId) ⇒ + extensionId(system).adapters + case None ⇒ + val extensionId = new ExtensionId[PluginHolder] { + override def createExtension(system: ExtendedActorSystem): PluginHolder = { + val plugin = createPlugin(configPath)(journalDispatchSelector) + val adapters = createAdapters(configPath) + PluginHolder(plugin, adapters) + } + } + journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) + adaptersFor(journalPluginId) // Recursive invocation. + } + } + + /** + * INTERNAL API + * Looks up [[EventAdapters]] by journal plugin's ActorRef. + */ + private[akka] final def adaptersFor(journalPluginActor: ActorRef): EventAdapters = { + journalPluginExtensionId.get().values collectFirst { + case ext if ext(system).actor == journalPluginActor ⇒ ext(system).adapters + } match { + case Some(adapters) ⇒ adapters + case _ ⇒ IdentityEventAdapters + } + } + /** * Returns a journal plugin actor identified by `journalPluginId`. * When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path. @@ -170,8 +208,11 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { extensionId(system).actor case None ⇒ val extensionId = new ExtensionId[PluginHolder] { - override def createExtension(system: ExtendedActorSystem): PluginHolder = - PluginHolder(createPlugin(configPath)(journalDispatchSelector)) + override def createExtension(system: ExtendedActorSystem): PluginHolder = { + val plugin = createPlugin(configPath)(journalDispatchSelector) + val adapters = createAdapters(configPath) + PluginHolder(plugin, adapters) + } } journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) journalFor(journalPluginId) // Recursive invocation. @@ -192,8 +233,11 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { extensionId(system).actor case None ⇒ val extensionId = new ExtensionId[PluginHolder] { - override def createExtension(system: ExtendedActorSystem): PluginHolder = - PluginHolder(createPlugin(configPath)(snapshotDispatchSelector)) + override def createExtension(system: ExtendedActorSystem): PluginHolder = { + val plugin = createPlugin(configPath)(snapshotDispatchSelector) + val adapters = createAdapters(configPath) + PluginHolder(plugin, adapters) + } } snapshotPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) snapshotStoreFor(snapshotPluginId) // Recursive invocation. @@ -202,12 +246,12 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private def createPlugin(configPath: String)(dispatcherSelector: Class[_] ⇒ String) = { require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), - s"'reference.conf' is missing persistence plugin config path: '${configPath}'") + s"'reference.conf' is missing persistence plugin config path: '$configPath'") val pluginActorName = configPath val pluginConfig = system.settings.config.getConfig(configPath) val pluginClassName = pluginConfig.getString("class") - log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}") - val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get + log.debug(s"Create plugin: $pluginActorName $pluginClassName") + val pluginClass = system.dynamicAccess.getClassFor[Any](pluginClassName).get val pluginInjectConfig = if (pluginConfig.hasPath("inject-config")) pluginConfig.getBoolean("inject-config") else false val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass) val pluginActorArgs = if (pluginInjectConfig) List(pluginConfig) else Nil @@ -215,6 +259,11 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { system.systemActorOf(pluginActorProps, pluginActorName) } + private def createAdapters(configPath: String): EventAdapters = { + val pluginConfig = system.settings.config.getConfig(configPath) + EventAdapters(system, pluginConfig) + } + /** Creates a canonical persistent actor id from a persistent actor ref. */ def persistenceId(persistentActor: ActorRef): String = id(persistentActor) diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index cf8adacf97..7da9628b12 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -7,10 +7,7 @@ package akka.persistence import java.lang.{ Iterable ⇒ JIterable } import java.util.{ List ⇒ JList } -import scala.collection.immutable - import akka.actor.{ ActorContext, ActorRef } -import akka.japi.Util.immutableSeq import akka.pattern.PromiseActorRef import akka.persistence.serialization.Message @@ -40,13 +37,17 @@ private[persistence] final case class NonPersistentRepr(payload: Any, sender: Ac * @see [[akka.persistence.journal.AsyncRecovery]] */ trait PersistentRepr extends PersistentEnvelope with Message { - import scala.collection.JavaConverters._ /** * This persistent message's payload. */ def payload: Any + /** + * Returns the persistent payload's manifest if available + */ + def manifest: String + /** * Persistent id that journals a persistent message */ @@ -62,6 +63,11 @@ trait PersistentRepr extends PersistentEnvelope with Message { */ def withPayload(payload: Any): PersistentRepr + /** + * Creates a new persistent message with the specified `manifest`. + */ + def withManifest(manifest: String): PersistentRepr + /** * `true` if this message is marked as deleted. */ @@ -83,10 +89,10 @@ trait PersistentRepr extends PersistentEnvelope with Message { } object PersistentRepr { - /** - * Plugin API: value of an undefined processor id. - */ + /** Plugin API: value of an undefined persistenceId or manifest. */ val Undefined = "" + /** Plugin API: value of an undefined / identity event adapter. */ + val UndefinedId = 0 /** * Plugin API. @@ -95,9 +101,10 @@ object PersistentRepr { payload: Any, sequenceNr: Long = 0L, persistenceId: String = PersistentRepr.Undefined, + manifest: String = PersistentRepr.Undefined, deleted: Boolean = false, sender: ActorRef = null): PersistentRepr = - PersistentImpl(payload, sequenceNr, persistenceId, deleted, sender) + PersistentImpl(payload, sequenceNr, persistenceId, manifest, deleted, sender) /** * Java API, Plugin API. @@ -115,20 +122,21 @@ object PersistentRepr { * INTERNAL API. */ private[persistence] final case class PersistentImpl( - payload: Any, - sequenceNr: Long, + override val payload: Any, + override val sequenceNr: Long, override val persistenceId: String, - deleted: Boolean, - sender: ActorRef) extends PersistentRepr { + override val manifest: String, + override val deleted: Boolean, + override val sender: ActorRef) extends PersistentRepr { def withPayload(payload: Any): PersistentRepr = copy(payload = payload) - def update( - sequenceNr: Long, - persistenceId: String, - deleted: Boolean, - sender: ActorRef) = + def withManifest(manifest: String): PersistentRepr = + if (this.manifest == manifest) this + else copy(manifest = manifest) + + def update(sequenceNr: Long, persistenceId: String, deleted: Boolean, sender: ActorRef) = copy(sequenceNr = sequenceNr, persistenceId = persistenceId, deleted = deleted, sender = sender) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 532bfdadfb..1cbf85732a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -5,20 +5,20 @@ package akka.persistence.journal -import scala.collection.immutable -import scala.concurrent.Future -import scala.util._ - import akka.actor._ import akka.pattern.pipe import akka.persistence._ +import scala.collection.immutable +import scala.concurrent.Future +import scala.util._ + /** * Abstract journal, optimized for asynchronous, non-blocking writes. */ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { - import JournalProtocol._ import AsyncWriteJournal._ + import JournalProtocol._ import context.dispatcher private val extension = Persistence(context.system) @@ -47,12 +47,15 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { // Send replayed messages and replay result to persistentActor directly. No need // to resequence replayed messages relative to written and looped messages. asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p ⇒ - if (!p.deleted || replayDeleted) persistentActor.tell(ReplayedMessage(p), Actor.noSender) + if (!p.deleted || replayDeleted) + adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒ + persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender) + } } map { case _ ⇒ ReplayMessagesSuccess } recover { case e ⇒ ReplayMessagesFailure(e) - } pipeTo (persistentActor) onSuccess { + } pipeTo persistentActor onSuccess { case _ if publish ⇒ context.system.eventStream.publish(r) } case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor) ⇒ @@ -62,7 +65,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { highest ⇒ ReadHighestSequenceNrSuccess(highest) } recover { case e ⇒ ReadHighestSequenceNrFailure(e) - } pipeTo (persistentActor) + } pipeTo persistentActor case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) ⇒ asyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent) onComplete { case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) @@ -85,6 +88,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { */ def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] //#journal-plugin-api + } /** diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala index ebcc71aa02..036e0367e5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala @@ -20,7 +20,7 @@ import akka.util._ * * A journal that delegates actual storage to a target actor. For testing only. */ -private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash { +private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash with ActorLogging { import AsyncWriteProxy._ import AsyncWriteTarget._ @@ -32,7 +32,8 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash store = ref unstashAll() context.become(initialized) - case _ ⇒ stash() + case x ⇒ + stash() } implicit def timeout: Timeout @@ -43,8 +44,8 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = (store ? DeleteMessagesTo(persistenceId, toSequenceNr, permanent)).mapTo[Unit] - def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Unit] = { - val replayCompletionPromise = Promise[Unit] + def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] = { + val replayCompletionPromise = Promise[Unit]() val mediator = context.actorOf(Props(classOf[ReplayMediator], replayCallback, replayCompletionPromise, timeout.duration).withDeploy(Deploy.local)) store.tell(ReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max), mediator) replayCompletionPromise.future diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapter.scala b/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapter.scala new file mode 100644 index 0000000000..b54c8f4b69 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapter.scala @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package akka.persistence.journal + +import scala.annotation.varargs +import scala.collection.immutable + +/** + * Facility to convert from and to specialised data models, as may be required by specialized persistence Journals. + * + * Typical use cases include (but are not limited to): + *
    + *
  • adding metadata, a.k.a. "tagging" - by wrapping objects into tagged counterparts
  • + *
  • manually converting to the Journals storage format, such as JSON, BSON or any specialised binary format
  • + *
  • adapting incoming events in any way before persisting them by the journal
  • + *
+ */ +abstract class EventAdapter { + //#event-adapter-api + /** + * Return the manifest (type hint) that will be provided in the `fromJournal` method. + * Use `""` if manifest is not needed. + */ + def manifest(event: Any): String + + /** + * Convert domain event to journal event type. + * + * Some journal may require a specific type to be returned to them, + * for example if a primary key has to be associated with each event then a journal + * may require adapters to return `com.example.myjournal.EventWithPrimaryKey(event, key)`. + * + * The `toJournal` adaptation must be an 1-to-1 transformation. + * It is not allowed to drop incoming events during the `toJournal` adaptation. + * + * @param event the application-side domain event to be adapted to the journal model + * @return the adapted event object, possibly the same object if no adaptation was performed + */ + def toJournal(event: Any): Any + + /** + * Convert a event from its journal model to the applications domain model. + * + * One event may be adapter into multiple (or none) events which should be delivered to the [[akka.persistence.PersistentActor]]. + * Use the specialised [[EventSeq.single]] method to emit exactly one event, or [[EventSeq.empty]] in case the adapter + * is not handling this event. Multiple [[EventAdapter]] instances are applied in order as defined in configuration + * and their emitted event seqs are concatenated and delivered in order to the PersistentActor. + * + * @param event event to be adapted before delivering to the PersistentActor + * @param manifest optionally provided manifest (type hint) in case the Adapter has stored one for this event, `""` if none + * @return sequence containing the adapted events (possibly zero) which will be delivered to the PersistentActor + */ + def fromJournal(event: Any, manifest: String): EventSeq + + //#event-adapter-api +} + +sealed abstract class EventSeq { + def events: immutable.Seq[Any] +} +object EventSeq { + /** Java API */ + final def empty: EventSeq = EmptyEventSeq + + /** Java API */ + final def single(event: Any): EventSeq = new SingleEventSeq(event) + + /** Java API */ + @varargs final def create(events: Any*): EventSeq = EventsSeq(events.toList) + final def apply(events: Any*): EventSeq = EventsSeq(events.toList) +} +final case class SingleEventSeq(event: Any) extends EventSeq { // TODO try to make it a value class, would save allocations + override val events: immutable.Seq[Any] = List(event) + override def toString = s"SingleEventSeq($event)" +} + +sealed trait EmptyEventSeq extends EventSeq +final object EmptyEventSeq extends EmptyEventSeq { + override def events = Nil +} + +final case class EventsSeq[E](events: immutable.Seq[E]) extends EventSeq + +/** No-op model adapter which passes through the incoming events as-is. */ +final case object IdentityEventAdapter extends EventAdapter { + override def toJournal(event: Any): Any = event + override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event) + override def manifest(event: Any): String = "" +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala b/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala new file mode 100644 index 0000000000..e8b1efcd4f --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala @@ -0,0 +1,156 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package akka.persistence.journal + +import java.util +import java.util.concurrent.ConcurrentHashMap + +import akka.actor.ExtendedActorSystem +import akka.event.{ Logging, LoggingAdapter } +import com.typesafe.config.Config + +import scala.collection.immutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag +import scala.util.Try + +/** INTERNAL API */ +private[akka] class EventAdapters( + map: ConcurrentHashMap[Class[_], EventAdapter], + bindings: immutable.Seq[(Class[_], EventAdapter)], + log: LoggingAdapter) { + + /** + * Finds the "most specific" matching adapter for the given class (i.e. it may return an adapter that can work on a + * interface implemented by the given class if no direct match is found). + * + * Falls back to [[IdentityEventAdapter]] if no adapter was defined for the given class. + */ + def get(clazz: Class[_]): EventAdapter = { + map.get(clazz) match { + case null ⇒ // bindings are ordered from most specific to least specific + val value = bindings filter { + _._1 isAssignableFrom clazz + } match { + case (_, bestMatch) +: _ ⇒ bestMatch + case _ ⇒ IdentityEventAdapter + } + map.putIfAbsent(clazz, value) match { + case null ⇒ + log.debug(s"Using EventAdapter: {} for event [{}]", value.getClass.getName, clazz.getName) + value + case some ⇒ some + } + case value ⇒ value + } + } + + override def toString = + s"${getClass.getName}($map, $bindings)" + +} + +/** INTERNAL API */ +private[akka] object EventAdapters { + type Name = String + type BoundAdapters = immutable.Seq[String] + type FQN = String + type ClassHandler = (Class[_], EventAdapter) + + def apply(system: ExtendedActorSystem, config: Config): EventAdapters = { + val adapters = configToMap(config, "event-adapters") + val adapterBindings = configToListMap(config, "event-adapter-bindings") + apply(system, adapters, adapterBindings) + } + + private def apply( + system: ExtendedActorSystem, + adapters: Map[Name, FQN], + adapterBindings: Map[FQN, BoundAdapters]): EventAdapters = { + + val adapterNames = adapters.keys.toSet + for { + (fqn, boundToAdapters) ← adapterBindings + boundAdapter ← boundToAdapters + } require(adapterNames(boundAdapter.toString), + s"$fqn was bound to undefined event-adapter: $boundAdapter (bindings: ${boundToAdapters.mkString("[", ", ", "]")}, known adapters: ${adapters.keys.mkString})") + + // A Map of handler from alias to implementation (i.e. class implementing akka.serialization.Serializer) + // For example this defines a handler named 'country': `"country" -> com.example.comain.CountryTagsAdapter` + val handlers = for ((k: String, v: String) ← adapters) yield k -> instantiate[EventAdapter](v, system).get + + // bindings is a Seq of tuple representing the mapping from Class to handler. + // It is primarily ordered by the most specific classes first, and secondly in the configured order. + val bindings: immutable.Seq[ClassHandler] = { + val bs = for ((k: FQN, as: BoundAdapters) ← adapterBindings) + yield if (as.size == 1) (system.dynamicAccess.getClassFor[Any](k).get, handlers(as.head)) + else (system.dynamicAccess.getClassFor[Any](k).get, CombinedReadEventAdapter(as.map(handlers))) + + sort(bs) + } + + val backing = (new ConcurrentHashMap[Class[_], EventAdapter] /: bindings) { case (map, (c, s)) ⇒ map.put(c, s); map } + + new EventAdapters(backing, bindings, system.log) + } + + /** INTERNAL API */ + private[akka] case class CombinedReadEventAdapter(adapters: immutable.Seq[EventAdapter]) extends EventAdapter { + private def onlyReadSideException = new IllegalStateException("CombinedReadEventAdapter must not be used when writing (creating manifests) events!") + override def manifest(event: Any): String = throw onlyReadSideException + override def toJournal(event: Any): Any = throw onlyReadSideException + + override def fromJournal(event: Any, manifest: String): EventSeq = + EventSeq(adapters.flatMap(_.fromJournal(event, manifest).events): _*) // TODO could we could make EventSeq flatMappable + + override def toString = + s"CombinedReadEventAdapter(${adapters.map(_.getClass.getCanonicalName).mkString(",")})" + } + + /** + * Tries to load the specified Serializer by the fully-qualified name; the actual + * loading is performed by the system’s [[akka.actor.DynamicAccess]]. + */ + private def instantiate[T: ClassTag](fqn: FQN, system: ExtendedActorSystem): Try[T] = + system.dynamicAccess.createInstanceFor[T](fqn, List(classOf[ExtendedActorSystem] -> system)) recoverWith { + case _: NoSuchMethodException ⇒ system.dynamicAccess.createInstanceFor[T](fqn, Nil) + } + + /** + * Sort so that subtypes always precede their supertypes, but without + * obeying any order between unrelated subtypes (insert sort). + */ + private def sort[T](in: Iterable[(Class[_], T)]): immutable.Seq[(Class[_], T)] = + (new ArrayBuffer[(Class[_], T)](in.size) /: in) { (buf, ca) ⇒ + buf.indexWhere(_._1 isAssignableFrom ca._1) match { + case -1 ⇒ buf append ca + case x ⇒ buf insert (x, ca) + } + buf + }.to[immutable.Seq] + + private final def configToMap(config: Config, path: String): Map[String, String] = { + import scala.collection.JavaConverters._ + if (config.hasPath(path)) { + config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) ⇒ k -> v.toString } + } else Map.empty + } + + private final def configToListMap(config: Config, path: String): Map[String, immutable.Seq[String]] = { + import scala.collection.JavaConverters._ + if (config.hasPath(path)) { + config.getConfig(path).root.unwrapped.asScala.toMap map { + case (k, v: util.ArrayList[_]) if v.isInstanceOf[util.ArrayList[_]] ⇒ k -> v.asScala.map(_.toString).toList + case (k, v) ⇒ k -> List(v.toString) + } + } else Map.empty + } + +} + +private[akka] case object IdentityEventAdapters extends EventAdapters(null, null, null) { + override def get(clazz: Class[_]): EventAdapter = IdentityEventAdapter + override def toString = Logging.simpleName(IdentityEventAdapters) +} \ No newline at end of file diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala index e28dff2620..e97f5e00f1 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -8,14 +8,14 @@ package akka.persistence.journal import scala.collection.immutable import scala.util._ -import akka.actor.Actor +import akka.actor.{ ActorLogging, Actor } import akka.pattern.pipe import akka.persistence._ /** * Abstract journal, optimized for synchronous writes. */ -trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { +trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery with ActorLogging { import JournalProtocol._ import context.dispatcher @@ -39,22 +39,28 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } throw e } + case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor, replayDeleted) ⇒ asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p ⇒ - if (!p.deleted || replayDeleted) persistentActor.tell(ReplayedMessage(p), p.sender) + if (!p.deleted || replayDeleted) + adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒ + persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), adaptedPersistentRepr.sender) + } } map { case _ ⇒ ReplayMessagesSuccess } recover { case e ⇒ ReplayMessagesFailure(e) - } pipeTo (persistentActor) onSuccess { + } pipeTo persistentActor onSuccess { case _ if publish ⇒ context.system.eventStream.publish(r) } + case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor) ⇒ asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).map { highest ⇒ ReadHighestSequenceNrSuccess(highest) } recover { case e ⇒ ReadHighestSequenceNrFailure(e) - } pipeTo (persistentActor) + } pipeTo persistentActor + case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) ⇒ Try(deleteMessagesTo(persistenceId, toSequenceNr, permanent)) match { case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala b/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala index 0265dcfbfc..452ff62d41 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala @@ -4,16 +4,37 @@ package akka.persistence.journal -import akka.persistence.{ PersistentRepr, PersistentEnvelope } import akka.actor.Actor +import akka.persistence.{ Persistence, PersistentEnvelope, PersistentRepr } + import scala.collection.immutable private[akka] trait WriteJournalBase { this: Actor ⇒ + lazy val persistence = Persistence(context.system) + private def eventAdapters = persistence.adaptersFor(self) + protected def preparePersistentBatch(rb: immutable.Seq[PersistentEnvelope]): immutable.Seq[PersistentRepr] = - rb.collect { - case p: PersistentRepr ⇒ p.update(sender = Actor.noSender) // don't store sender + rb.collect { // collect instead of flatMap to avoid Some allocations + case p: PersistentRepr ⇒ adaptToJournal(p.update(sender = Actor.noSender)) // don't store the sender } + /** INTERNAL API */ + private[akka] final def adaptFromJournal(repr: PersistentRepr): immutable.Seq[PersistentRepr] = + eventAdapters.get(repr.payload.getClass).fromJournal(repr.payload, repr.manifest).events map { adaptedPayload ⇒ + repr.withPayload(adaptedPayload) + } + + /** INTERNAL API */ + private[akka] final def adaptToJournal(repr: PersistentRepr): PersistentRepr = { + val payload = repr.payload + val adapter = eventAdapters.get(payload.getClass) + val manifest = adapter.manifest(payload) + + repr + .withPayload(adapter.toJournal(payload)) + .withManifest(manifest) + } + } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 299fcd8844..79fe0b5c14 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -10,8 +10,7 @@ import scala.language.postfixOps import akka.actor._ import akka.persistence._ -import akka.persistence.journal.AsyncWriteProxy -import akka.persistence.journal.AsyncWriteTarget +import akka.persistence.journal.{ WriteJournalBase, AsyncWriteProxy, AsyncWriteTarget } import akka.util.Timeout /** @@ -72,7 +71,7 @@ private[persistence] trait InmemMessages { /** * INTERNAL API. */ -private[persistence] class InmemStore extends Actor with InmemMessages { +private[persistence] class InmemStore extends Actor with InmemMessages with WriteJournalBase { import AsyncWriteTarget._ def receive = { @@ -83,7 +82,7 @@ private[persistence] class InmemStore extends Actor with InmemMessages { case DeleteMessagesTo(pid, tsnr, true) ⇒ sender() ! (1L to tsnr foreach { snr ⇒ delete(pid, snr) }) case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ - read(pid, fromSnr, toSnr, max).foreach(sender() ! _) + read(pid, fromSnr, toSnr, max).foreach { sender() ! _ } sender() ! ReplaySuccess case ReadHighestSequenceNr(persistenceId, _) ⇒ sender() ! highestSequenceNr(persistenceId) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index 5c244a9304..828b8f4053 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -7,20 +7,19 @@ package akka.persistence.journal.leveldb import java.io.File -import scala.collection.immutable -import scala.util._ - -import org.iq80.leveldb._ - import akka.actor._ import akka.persistence._ -import akka.persistence.journal.AsyncWriteTarget +import akka.persistence.journal.{ WriteJournalBase, AsyncWriteTarget } import akka.serialization.SerializationExtension +import org.iq80.leveldb._ + +import scala.collection.immutable +import scala.util._ /** * INTERNAL API. */ -private[persistence] trait LeveldbStore extends Actor with LeveldbIdMapping with LeveldbRecovery { +private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with LeveldbIdMapping with LeveldbRecovery { val configPath: String val config = context.system.settings.config.getConfig(configPath) @@ -112,11 +111,11 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le import AsyncWriteTarget._ def receive = { - case WriteMessages(msgs) ⇒ sender() ! writeMessages(msgs) + case WriteMessages(msgs) ⇒ sender() ! writeMessages(preparePersistentBatch(msgs)) case DeleteMessagesTo(pid, tsnr, permanent) ⇒ sender() ! deleteMessagesTo(pid, tsnr, permanent) case ReadHighestSequenceNr(pid, fromSequenceNr) ⇒ sender() ! readHighestSequenceNr(numericId(pid)) case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ - Try(replayMessages(numericId(pid), fromSnr, toSnr, max)(sender() ! _)) match { + Try(replayMessages(numericId(pid), fromSnr, toSnr, max)(p ⇒ adaptFromJournal(p).foreach { sender() ! _ })) match { case Success(max) ⇒ sender() ! ReplaySuccess case Failure(cause) ⇒ sender() ! ReplayFailure(cause) } diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index f785ed9fe1..cb0d86427a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -4,20 +4,19 @@ package akka.persistence.serialization -import scala.concurrent.duration -import scala.concurrent.duration.Duration -import scala.language.existentials -import com.google.protobuf._ import akka.actor.{ ActorPath, ExtendedActorSystem } -import akka.japi.Util.immutableSeq +import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot ⇒ AtLeastOnceDeliverySnap, UnconfirmedDelivery } import akka.persistence._ +import akka.persistence.fsm.PersistentFsmActor.StateChangeEvent import akka.persistence.serialization.MessageFormats._ import akka.serialization._ -import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot ⇒ AtLeastOnceDeliverySnap } -import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery +import com.google.protobuf._ + import scala.collection.immutable.VectorBuilder -import akka.persistence.fsm.PersistentFsmActor.StateChangeEvent +import scala.concurrent.duration import akka.actor.Actor +import scala.concurrent.duration.Duration +import scala.language.existentials /** * Marker trait for all protobuf-serializable messages in `akka.persistence`. @@ -164,6 +163,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer payload(persistentMessage.getPayload), persistentMessage.getSequenceNr, if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined, + if (persistentMessage.hasManifest) persistentMessage.getManifest else Undefined, persistentMessage.getDeleted, if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else Actor.noSender) } diff --git a/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala new file mode 100644 index 0000000000..868f56f2fe --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala @@ -0,0 +1,241 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence + +import java.io.File + +import akka.actor._ +import akka.persistence.EndToEndEventAdapterSpec.NewA +import akka.persistence.journal.{ EventSeq, EventAdapter } +import akka.testkit.{ ImplicitSender, WatchedByCoroner, AkkaSpec, TestProbe } +import com.typesafe.config.{ Config, ConfigFactory } +import org.apache.commons.io.FileUtils +import org.scalatest.{ WordSpecLike, Matchers, BeforeAndAfterAll } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +object EndToEndEventAdapterSpec { + + trait AppModel { def payload: Any } + case class A(payload: Any) extends AppModel + case class B(payload: Any) extends AppModel + case class NewA(payload: Any) extends AppModel + case class NewB(payload: Any) extends AppModel + + case class JSON(payload: Any) + + class AEndToEndAdapter(system: ExtendedActorSystem) extends EventAdapter { + override def manifest(event: Any): String = event.getClass.getCanonicalName + + override def toJournal(event: Any): Any = + event match { case m: AppModel ⇒ JSON(m.payload) } + override def fromJournal(event: Any, manifest: String): EventSeq = event match { + case m: JSON if m.payload.toString.startsWith("a") ⇒ EventSeq.single(A(m.payload)) + case _ ⇒ EventSeq.empty + } + } + class NewAEndToEndAdapter(system: ExtendedActorSystem) extends EventAdapter { + override def manifest(event: Any): String = event.getClass.getCanonicalName + + override def toJournal(event: Any): Any = + event match { case m: AppModel ⇒ JSON(m.payload) } + override def fromJournal(event: Any, manifest: String): EventSeq = event match { + case m: JSON if m.payload.toString.startsWith("a") ⇒ EventSeq.single(NewA(m.payload)) + case _ ⇒ EventSeq.empty + } + } + class BEndToEndAdapter(system: ExtendedActorSystem) extends EventAdapter { + override def manifest(event: Any): String = event.getClass.getCanonicalName + + override def toJournal(event: Any): Any = + event match { case m: AppModel ⇒ JSON(m.payload) } + override def fromJournal(event: Any, manifest: String): EventSeq = event match { + case m: JSON if m.payload.toString.startsWith("b") ⇒ EventSeq.single(B(m.payload)) + case _ ⇒ EventSeq.empty + } + } + class NewBEndToEndAdapter(system: ExtendedActorSystem) extends EventAdapter { + override def manifest(event: Any): String = event.getClass.getCanonicalName + + override def toJournal(event: Any): Any = + event match { case m: AppModel ⇒ JSON(m.payload) } + override def fromJournal(event: Any, manifest: String): EventSeq = event match { + case m: JSON if m.payload.toString.startsWith("b") ⇒ EventSeq.single(NewB(m.payload)) + case _ ⇒ EventSeq.empty + } + } + + class EndToEndAdapterActor(name: String, override val journalPluginId: String, probe: Option[ActorRef]) + extends NamedPersistentActor(name) with PersistentActor { + + var state: List[Any] = Nil + + val persistIncoming: Receive = { + case GetState ⇒ + state.reverse.foreach { sender() ! _ } + case in ⇒ + persist(in) { e ⇒ + state ::= e + sender() ! e + } + } + + override def receiveRecover = { + case RecoveryCompleted ⇒ // ignore + case e ⇒ state ::= e + } + override def receiveCommand = persistIncoming + + override def onReplayFailure(cause: Throwable, event: Option[Any]): Unit = + probe.foreach { _ ! cause } + } + +} + +abstract class EndToEndEventAdapterSpec(journalName: String, journalConfig: Config) + extends WordSpecLike with Matchers with BeforeAndAfterAll { + import EndToEndEventAdapterSpec._ + + val storageLocations = List("akka.persistence.journal.leveldb.dir") + .map(s ⇒ new File(journalConfig.getString(s))) + + override protected def beforeAll() { + storageLocations.foreach(FileUtils.deleteDirectory) + } + + override protected def afterAll() { + storageLocations.foreach(FileUtils.deleteDirectory) + } + + val noAdaptersConfig = ConfigFactory.parseString("") + + val adaptersConfig = ConfigFactory.parseString( + s""" + |akka.persistence.journal { + | $journalName { + | event-adapters { + | a = "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$AEndToEndAdapter" + | b = "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$BEndToEndAdapter" + | } + | event-adapter-bindings { + | # to journal + | "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$A" = a + | "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$B" = b + | # from journal + | "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$JSON" = [a, b] + | + | } + | } + |} + """.stripMargin) + + val newAdaptersConfig = ConfigFactory.parseString( + s""" + |akka.persistence.journal { + | $journalName { + | event-adapters { + | a = "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$NewAEndToEndAdapter" + | b = "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$NewBEndToEndAdapter" + | } + | event-adapter-bindings { + | # to journal + | "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$A" = a + | "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$B" = b + | # from journal + | "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$JSON" = [a, b] + | + | } + | } + |} + """.stripMargin) + + def persister(name: String, probe: Option[ActorRef] = None)(implicit system: ActorSystem) = + system.actorOf(Props(classOf[EndToEndAdapterActor], name, "akka.persistence.journal." + journalName, probe), name) + + def withActorSystem[T](name: String, config: Config)(block: ActorSystem ⇒ T): T = { + val system = ActorSystem(name, journalConfig withFallback config) + try block(system) finally Await.ready(system.terminate(), 3.seconds) + } + + "EventAdapters in end-to-end scenarios" must { + + "use the same adapter when reading as was used when writing to the journal" in + withActorSystem("SimpleSystem", adaptersConfig) { implicit system ⇒ + val p = TestProbe() + implicit val ref = p.ref + + val p1 = persister("p1") + val a = A("a1") + val b = B("b1") + p1 ! a + p1 ! b + p.expectMsg(a) + p.expectMsg(b) + + p.watch(p1) + p1 ! PoisonPill + p.expectTerminated(p1) + + val p11 = persister("p1") + p11 ! GetState + p.expectMsg(A("a1")) + p.expectMsg(B("b1")) + } + + "allow using an adapter, when write was performed without an adapter" in { + val persistentName = "p2" + + withActorSystem("NoAdapterSystem", adaptersConfig) { implicit system ⇒ + val p = TestProbe() + implicit val ref = p.ref + + val p2 = persister(persistentName) + val a = A("a1") + val b = B("b1") + p2 ! a + p2 ! b + p.expectMsg(a) + p.expectMsg(b) + + p.watch(p2) + p2 ! PoisonPill + p.expectTerminated(p2) + + val p11 = persister(persistentName) + p11 ! GetState + p.expectMsg(A("a1")) + p.expectMsg(B("b1")) + } + + withActorSystem("NowAdaptersAddedSystem", newAdaptersConfig) { implicit system ⇒ + val p = TestProbe() + implicit val ref = p.ref + + val p22 = persister(persistentName) + p22 ! GetState + p.expectMsg(NewA("a1")) + p.expectMsg(NewB("b1")) + } + } + + "give nice error message when unable to play back as adapter does not exist" in { + // after some time, we start the system a-new... + // and the adapter originally used for adapting A is missing from the configuration! + val journalPath = s"akka.persistence.journal.$journalName" + val missingAdapterConfig = adaptersConfig + .withoutPath(s"$journalPath.event-adapters.a") + .withoutPath(s"""$journalPath.event-adapter-bindings."${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$A"""") + + intercept[IllegalArgumentException] { + withActorSystem("MissingAdapterSystem", journalConfig.withFallback(missingAdapterConfig)) { implicit system2 ⇒ + Persistence(system2).adaptersFor(s"akka.persistence.journal.$journalName").get(classOf[String]) + } + }.getMessage should include("was bound to undefined event-adapter: a (bindings: [a, b], known adapters: b)") + } + } +} + +// needs persistence between actor systems, thus not running with the inmem journal +class LeveldbEndToEndEventAdapterSpec extends EndToEndEventAdapterSpec("leveldb", PersistenceSpec.config("leveldb", "LeveldbEndToEndEventAdapterSpec")) diff --git a/akka-persistence/src/test/scala/akka/persistence/InmemEventAdapterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/InmemEventAdapterSpec.scala new file mode 100644 index 0000000000..52d5ee4bda --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/InmemEventAdapterSpec.scala @@ -0,0 +1,220 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence + +import akka.actor._ +import akka.event.Logging +import akka.persistence.journal.{ SingleEventSeq, EventSeq, EventAdapter } +import akka.testkit.ImplicitSender +import com.typesafe.config.{ Config, ConfigFactory } + +import scala.collection.immutable + +object InmemEventAdapterSpec { + + final val JournalModelClassName = classOf[InmemEventAdapterSpec].getCanonicalName + "$" + classOf[JournalModel].getSimpleName + trait JournalModel { + def payload: Any + def tags: immutable.Set[String] + } + final case class Tagged(payload: Any, tags: immutable.Set[String]) extends JournalModel + final case class NotTagged(payload: Any) extends JournalModel { + override def tags = Set.empty + } + + final val DomainEventClassName = classOf[InmemEventAdapterSpec].getCanonicalName + "$" + classOf[DomainEvent].getSimpleName + trait DomainEvent + final case class TaggedDataChanged(tags: immutable.Set[String], value: Int) extends DomainEvent + final case class UserDataChanged(countryCode: String, age: Int) extends DomainEvent + + class UserAgeTaggingAdapter extends EventAdapter { + val Adult = Set("adult") + val Minor = Set("minor") + + override def toJournal(event: Any): Any = event match { + case e @ UserDataChanged(_, age) if age > 18 ⇒ Tagged(e, Adult) + case e @ UserDataChanged(_, age) ⇒ Tagged(e, Minor) + case e ⇒ NotTagged(e) + } + override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single { + event match { + case m: JournalModel ⇒ m.payload + } + } + + override def manifest(event: Any): String = "" + } + + class ReplayPassThroughAdapter extends UserAgeTaggingAdapter { + override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single { + event match { + case m: JournalModel ⇒ event // don't unpack, just pass through the JournalModel + } + } + } + + class LoggingAdapter(system: ExtendedActorSystem) extends EventAdapter { + final val log = Logging(system, getClass) + override def toJournal(event: Any): Any = { + log.info("On its way to the journal: []: " + event) + event + } + override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single { + log.info("On its way out from the journal: []: " + event) + event + } + + override def manifest(event: Any): String = "" + } + + class PersistAllIncomingActor(name: String, override val journalPluginId: String) + extends NamedPersistentActor(name) with PersistentActor { + + var state: List[Any] = Nil + + val persistIncoming: Receive = { + case GetState ⇒ + state.reverse.foreach { sender() ! _ } + case in ⇒ + persist(in) { e ⇒ + state ::= e + sender() ! e + } + } + + override def receiveRecover = { + case RecoveryCompleted ⇒ // ignore + case e ⇒ state ::= e + } + override def receiveCommand = persistIncoming + } + +} + +class InmemEventAdapterSpec(journalName: String, journalConfig: Config, adapterConfig: Config) + extends PersistenceSpec(journalConfig.withFallback(adapterConfig)) with ImplicitSender { + + import InmemEventAdapterSpec._ + + def this() { + this("inmem", PersistenceSpec.config("inmem", "InmemPersistentTaggingSpec"), ConfigFactory.parseString( + s""" + |akka.persistence.journal { + | + | common-event-adapters { + | age = "${classOf[InmemEventAdapterSpec].getCanonicalName}$$UserAgeTaggingAdapter" + | replay-pass-through = "${classOf[InmemEventAdapterSpec].getCanonicalName}$$ReplayPassThroughAdapter" + | } + | + | inmem { + | event-adapters = $${akka.persistence.journal.common-event-adapters} + | event-adapter-bindings { + | "${InmemEventAdapterSpec.DomainEventClassName}" = age + | "${InmemEventAdapterSpec.JournalModelClassName}" = age + | } + | } + | + | with-actor-system { + | class = $${akka.persistence.journal.inmem.class} + | dir = "journal-1" + | + | event-adapters { + | logging = "${classOf[InmemEventAdapterSpec].getCanonicalName}$$LoggingAdapter" + | } + | event-adapter-bindings { + | "java.lang.Object" = logging + | } + | } + | + | replay-pass-through-adapter-journal { + | class = $${akka.persistence.journal.inmem.class} + | dir = "journal-2" + | + | event-adapters = $${akka.persistence.journal.common-event-adapters} + | event-adapter-bindings { + | "${InmemEventAdapterSpec.JournalModelClassName}" = replay-pass-through + | "${InmemEventAdapterSpec.DomainEventClassName}" = replay-pass-through + | } + | } + | + | no-adapter { + | class = $${akka.persistence.journal.inmem.class} + | dir = "journal-3" + | } + |} + """.stripMargin)) + } + + def persister(name: String, journalId: String = journalName) = + system.actorOf(Props(classOf[PersistAllIncomingActor], name, "akka.persistence.journal." + journalId), name) + + def toJournal(in: Any, journalId: String = journalName) = + Persistence(system).adaptersFor("akka.persistence.journal." + journalId).get(in.getClass).toJournal(in) + + def fromJournal(in: Any, journalId: String = journalName) = + Persistence(system).adaptersFor("akka.persistence.journal." + journalId).get(in.getClass).fromJournal(in, "") + + "EventAdapter" must { + + "wrap with tags" in { + val event = UserDataChanged("name", 42) + toJournal(event) should equal(Tagged(event, Set("adult"))) + } + + "unwrap when reading" in { + val event = UserDataChanged("name", 42) + val tagged = Tagged(event, Set("adult")) + + toJournal(event) should equal(tagged) + fromJournal(tagged) should equal(SingleEventSeq(event)) + } + + "create adapter requiring ActorSystem" in { + val event = UserDataChanged("name", 42) + toJournal(event, "with-actor-system") should equal(event) + fromJournal(event, "with-actor-system") should equal(SingleEventSeq(event)) + } + + "store events after applying adapter" in { + val replayPassThroughJournalId = "replay-pass-through-adapter-journal" + + val p1 = persister("p1", journalId = replayPassThroughJournalId) + val m1 = UserDataChanged("name", 64) + val m2 = "hello" + p1 ! m1 + p1 ! m2 + expectMsg(m1) + expectMsg(m2) + + watch(p1) + p1 ! PoisonPill + expectTerminated(p1) + + val p11 = persister("p1", journalId = replayPassThroughJournalId) + p11 ! GetState + expectMsg(Tagged(m1, Set("adult"))) + expectMsg(m2) + } + + "work when plugin defines no adapter" in { + val p2 = persister("p2", journalId = "no-adapter") + val m1 = UserDataChanged("name", 64) + val m2 = "hello" + p2 ! m1 + p2 ! m2 + expectMsg(m1) + expectMsg(m2) + + watch(p2) + p2 ! PoisonPill + expectTerminated(p2) + + val p22 = persister("p2", "no-adapter") + p22 ! GetState + expectMsg(m1) + expectMsg(m2) + } + } + +} diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index 0b19e9c463..f59f86b996 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -4,24 +4,19 @@ package akka.persistence -import akka.actor._ +import akka.actor.{ OneForOneStrategy, _ } import akka.persistence.journal.AsyncWriteProxy +import akka.persistence.journal.AsyncWriteTarget.{ ReplayFailure, ReplayMessages, ReplaySuccess, WriteMessages } import akka.persistence.journal.inmem.InmemStore -import akka.testkit.{ ImplicitSender, AkkaSpec } +import akka.testkit.{ ImplicitSender, TestProbe } import akka.util.Timeout -import com.typesafe.config.Config + import scala.concurrent.duration._ -import akka.persistence.journal.AsyncWriteTarget.{ ReplayFailure, ReplaySuccess, ReplayMessages, WriteMessages } import scala.language.postfixOps -import scala.Some -import akka.actor.OneForOneStrategy import scala.util.control.NoStackTrace -import akka.testkit.TestProbe object PersistentActorFailureSpec { - import PersistentActorSpec.Cmd - import PersistentActorSpec.Evt - import PersistentActorSpec.ExamplePersistentActor + import PersistentActorSpec.{ Cmd, Evt, ExamplePersistentActor } class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/EventAdaptersSpec.scala b/akka-persistence/src/test/scala/akka/persistence/journal/EventAdaptersSpec.scala new file mode 100644 index 0000000000..64094caea3 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/journal/EventAdaptersSpec.scala @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package akka.persistence.journal + +import akka.actor.ExtendedActorSystem +import akka.testkit.AkkaSpec +import com.typesafe.config.ConfigFactory + +class EventAdaptersSpec extends AkkaSpec { + + val config = ConfigFactory.parseString( + s""" + |akka.persistence.journal { + | plugin = "akka.persistence.journal.inmem" + | + | + | # adapters defined for all plugins + | common-event-adapter-bindings { + | } + | + | inmem { + | # showcases re-using and concating configuration of adapters + | + | event-adapters { + | example = ${classOf[ExampleEventAdapter].getCanonicalName} + | marker = ${classOf[MarkerInterfaceAdapter].getCanonicalName} + | precise = ${classOf[PreciseAdapter].getCanonicalName} + | } + | event-adapter-bindings = { + | "${classOf[EventMarkerInterface].getCanonicalName}" = marker + | "java.lang.String" = example + | "akka.persistence.journal.PreciseAdapterEvent" = precise + | } + | } + |} + """.stripMargin).withFallback(ConfigFactory.load()) + + val extendedActorSystem = system.asInstanceOf[ExtendedActorSystem] + val inmemConfig = config.getConfig("akka.persistence.journal.inmem") + + "EventAdapters" must { + "parse configuration and resolve adapter definitions" in { + val adapters = EventAdapters(extendedActorSystem, inmemConfig) + adapters.get(classOf[EventMarkerInterface]).getClass should ===(classOf[MarkerInterfaceAdapter]) + } + + "pick the most specific adapter available" in { + val adapters = EventAdapters(extendedActorSystem, inmemConfig) + + // sanity check; precise case, matching non-user classes + adapters.get(classOf[java.lang.String]).getClass should ===(classOf[ExampleEventAdapter]) + + // pick adapter by implemented marker interface + adapters.get(classOf[SampleEvent]).getClass should ===(classOf[MarkerInterfaceAdapter]) + + // more general adapter matches as well, but most specific one should be picked + adapters.get(classOf[PreciseAdapterEvent]).getClass should ===(classOf[PreciseAdapter]) + + // no adapter defined for Long, should return identity adapter + adapters.get(classOf[java.lang.Long]).getClass should ===(IdentityEventAdapter.getClass) + } + + "fail with useful message when binding to not defined adapter" in { + val badConfig = ConfigFactory.parseString( + """ + |akka.persistence.journal.inmem { + | event-adapter-bindings { + | "java.lang.Integer" = undefined-adapter + | } + |} + """.stripMargin) + + val combinedConfig = badConfig.getConfig("akka.persistence.journal.inmem") + val ex = intercept[IllegalArgumentException] { + EventAdapters(extendedActorSystem, combinedConfig) + } + + ex.getMessage should include("java.lang.Integer was bound to undefined event-adapter: undefined-adapter") + } + } + +} + +abstract class BaseTestAdapter extends EventAdapter { + override def toJournal(event: Any): Any = event + override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event) + override def manifest(event: Any): String = "" +} + +class ExampleEventAdapter extends BaseTestAdapter { +} +class MarkerInterfaceAdapter extends BaseTestAdapter { +} +class PreciseAdapter extends BaseTestAdapter { +} + +trait EventMarkerInterface +final case class SampleEvent() extends EventMarkerInterface +final case class PreciseAdapterEvent() extends EventMarkerInterface + diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala b/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala index 288de4791d..e270eba6e7 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala @@ -62,9 +62,8 @@ object SharedLeveldbJournalSpec { case m ⇒ p forward m } - override def preStart(): Unit = { + override def preStart(): Unit = context.actorSelection(storePath) ! Identify(1) - } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index b0ef23e013..e47a1c36f6 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -4,20 +4,17 @@ package akka.persistence.serialization -import scala.collection.immutable -import com.typesafe.config._ import akka.actor._ +import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot, UnconfirmedDelivery } import akka.persistence._ import akka.serialization._ import akka.testkit._ -import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot -import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery import akka.util.ByteString.UTF_8 -import scala.concurrent.Await -import scala.concurrent.duration.Duration +import com.typesafe.config._ import org.apache.commons.codec.binary.Hex.decodeHex -import SerializerSpecConfigs._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration object SerializerSpecConfigs { val customSerializers = ConfigFactory.parseString( @@ -66,7 +63,7 @@ object SerializerSpecConfigs { } -import SerializerSpecConfigs._ +import akka.persistence.serialization.SerializerSpecConfigs._ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) { val serialization = SerializationExtension(system) @@ -148,7 +145,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { "A message serializer" when { "not given a manifest" must { "handle custom Persistent message serialization" in { - val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true) + val persistent = PersistentRepr(MyPayload("a"), 13, "p1", "", true) val serializer = serialization.findSerializerFor(persistent) val bytes = serializer.toBinary(persistent) @@ -160,7 +157,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { "given a PersistentRepr manifest" must { "handle custom Persistent message serialization" in { - val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true) + val persistent = PersistentRepr(MyPayload("b"), 13, "p1", "", true) val serializer = serialization.findSerializerFor(persistent) val bytes = serializer.toBinary(persistent) @@ -172,7 +169,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { "given payload serializer with string manifest" must { "handle serialization" in { - val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", true) + val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", "", true) val serializer = serialization.findSerializerFor(persistent) val bytes = serializer.toBinary(persistent) @@ -195,7 +192,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { } "be able to deserialize data when class is removed" in { - val serializer = serialization.findSerializerFor(PersistentRepr("x", 13, "p1", true)) + val serializer = serialization.findSerializerFor(PersistentRepr("x", 13, "p1", "", true)) // It was created with: // val old = PersistentRepr(OldPayload('A'), 13, "p1", true, testActor) @@ -237,7 +234,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { "31393337" val bytes = decodeHex(oldData.toCharArray) - val expected = PersistentRepr(MyPayload(".a."), 13, "p1", true, Actor.noSender) + val expected = PersistentRepr(MyPayload(".a."), 13, "p1", "", true, Actor.noSender) val serializer = serialization.findSerializerFor(expected) val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[PersistentRepr] deserialized.sender should not be (null) diff --git a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala index 589e51ddb6..bc15b4b832 100644 --- a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala +++ b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala @@ -24,7 +24,7 @@ object PersistentPublisherSpec { } } -class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "ViewProducerSpec", serialization = "off")) with PersistenceSpec { +class PersistentPublisherSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "ViewProducerSpec", serialization = "off")) { import PersistentPublisherSpec._ val numMessages = 10 diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 3dfbbb19af..78c5277801 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -36,6 +36,11 @@ object Dependencies { // TODO remove with metrics from akka-cluster val sigar = "org.fusesource" % "sigar" % "1.6.4" // ApacheV2 + object Docs { + val sprayJson = "io.spray" %% "spray-json" % "1.3.2" % "test" + val gson = "com.google.code.gson" % "gson" % "2.3.1" % "test" + } + object Test { val commonsMath = "org.apache.commons" % "commons-math" % "2.2" % "test" // ApacheV2 val commonsIo = "commons-io" % "commons-io" % "2.4" % "test" // ApacheV2 @@ -111,7 +116,7 @@ object Dependencies { val osgi = l ++= Seq(osgiCore, osgiCompendium, Test.logback, Test.commonsIo, Test.pojosr, Test.tinybundles, Test.scalatest.value, Test.junit) - val docs = l ++= Seq(Test.scalatest.value, Test.junit, Test.junitIntf) + val docs = l ++= Seq(Test.scalatest.value, Test.junit, Test.junitIntf, Docs.sprayJson, Docs.gson) val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo) }