+per #17579 #17617 Introduces EventAdapter

+ per plugin scoped adapters
+ could be swapped during runtime
+per EventAdapter now has manifest and is configurable ai la serializers
+ json examples in docs
+ including "completely manual" example in case one wants to add
  metadata TO the persisted event
+ better error reporting when misconfigured bindings
+ manifest is handled by in memory plugin
- did not check if it works with LevelDB plugin yet
> TODO: json example uses Gson, as that's simplest to do, can we use
+per allows 1:n adapters, multiple adapters can be bound to 1 class
This commit is contained in:
Konrad Malawski 2015-05-29 18:20:51 +02:00
parent 0214d6e14d
commit 7e86dac542
28 changed files with 1534 additions and 119 deletions

View file

@ -0,0 +1,32 @@
/*
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.persistence;
import akka.persistence.journal.EventAdapter;
import akka.persistence.journal.EventSeq;
public class PersistenceEventAdapterDocTest {
@SuppressWarnings("unused")
static
//#identity-event-adapter
class MyEventAdapter extends EventAdapter {
@Override
public String manifest(Object event) {
return ""; // if no manifest needed, return ""
}
@Override
public Object toJournal(Object event) {
return event; // identity
}
@Override
public EventSeq fromJournal(Object event, String manifest) {
return EventSeq.single(event); // identity
}
}
//#identity-event-adapter
}

View file

@ -256,7 +256,7 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e
.. _defer-java: .. _defer-java:
Deferring actions until preceding persist handlers have executed Deferring actions until preceding persist handlers have executed
----------------------------------------------------------------- ----------------------------------------------------------------
Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of
''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method ''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method
@ -503,6 +503,54 @@ not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedM
The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages`` The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages``
configuration key. The method can be overridden by implementation classes to return non-default values. configuration key. The method can be overridden by implementation classes to return non-default values.
.. _event-adapters-java:
Event Adapters
==============
.. note::
Complete documentation featuring use-cases and implementation examples for this feature will follow shortly.
In long running projects using event sourcing sometimes the need arrises to detach the data model from the domain model
completely.
Event Adapters help in situations where:
- **Version Migration** existing events stored in *Version 1* should be "upcasted" to a new *Version 2* representation,
and the process of doing so involves actual code, not just changes on the serialization layer. For these scenarios
the ``toJournal`` function is usually an identity function, however the ``fromJournal`` is implemented as
``v1.Event=>v2.Event``, performing the neccessary mapping inside the fromJournal method.
This technique is sometimes refered to as "upcasting" in other CQRS libraries.
- **Separating Domain and Data models** thanks to EventAdapters it is possible to completely separate the domain model
from the model used to persist data in the Journals. For example one may want to use case classes in the
domain model, however persist their protocol-buffer (or any other binary serialization format) counter-parts to the Journal.
A simple ``toJournal:MyModel=>MyDataModel`` and ``fromJournal:MyDataModel=>MyModel`` adapter can be used to implement this feature.
- **Journal Specialized Data Types** exposing data types understood by the underlying Journal, for example for data stores which
understand JSON it is possible to write an EventAdapter ``toJournal:Any=>JSON`` such that the Journal can *directly* store the
json instead of serializing the object to its binary representation.
Implementing an EventAdapter is rather stright forward:
.. includecode:: code/docs/persistence/PersistenceEventAdapterDocTest.java#identity-event-adapter
Then in order for it to be used on events coming to and from the journal you must bind it using the below configuration syntax:
.. includecode:: ../scala/code/docs/persistence/PersistenceEventAdapterDocSpec.scala#event-adapters-config
It is possible to bind multiple adapters to one class *for recovery*, in which case the ``fromJournal`` methods of all
bound adapters will be applied to a given matching event (in order of definition in the configuration). Since each adapter may
return from ``0`` to ``n`` adapted events (called as ``EventSeq``), each adapter can investigate the event and if it should
indeed adapt it return the adapted event(s) for it, other adapters which do not have anything to contribute during this
adaptation simply return ``EventSeq.empty``. The adapted events are then delivered in-order to the ``PersistentActor`` during replay.
.. note::
More advanced techniques utilising advanced binary serialization formats such as protocol buffers or kryo / thrift / avro
will be documented very soon. These schema evolutions often may need to reach into the serialization layer, however
are much more powerful in terms of flexibly removing unused/deprecated classes from your classpath etc.
Storage plugins Storage plugins
=============== ===============

View file

@ -0,0 +1,233 @@
/*
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.persistence
import akka.actor.{ ExtendedActorSystem, Props }
import akka.persistence.journal.{ EventAdapter, EventSeq }
import akka.persistence.{ PersistentActor, RecoveryCompleted }
import akka.testkit.{ AkkaSpec, TestProbe }
import com.google.gson.{ Gson, JsonElement }
import scala.collection.immutable
class PersistenceEventAdapterDocSpec(config: String) extends AkkaSpec(config) {
def this() {
this("""
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
//#event-adapters-config
akka.persistence.journal {
inmem {
event-adapters {
tagging = "docs.persistence.MyTaggingEventAdapter"
user-upcasting = "docs.persistence.UserUpcastingEventAdapter"
item-upcasting = "docs.persistence.ItemUpcastingEventAdapter"
}
event-adapter-bindings {
"docs.persistence.Item" = tagging
"docs.persistence.TaggedEvent" = tagging
"docs.persistence.v1.Event" = [user-upcasting, item-upcasting]
}
}
}
//#event-adapters-config
akka.persistence.journal {
auto-json-store {
class = "akka.persistence.journal.inmem.InmemJournal" # reuse inmem, as an example
event-adapters {
auto-json = "docs.persistence.MyAutoJsonEventAdapter"
}
event-adapter-bindings {
"docs.persistence.DomainEvent" = auto-json # to journal
"com.google.gson.JsonElement" = auto-json # from journal
}
}
manual-json-store {
class = "akka.persistence.journal.inmem.InmemJournal" # reuse inmem, as an example
event-adapters {
manual-json = "docs.persistence.MyManualJsonEventAdapter"
}
event-adapter-bindings {
"docs.persistence.DomainEvent" = manual-json # to journal
"com.google.gson.JsonElement" = manual-json # from journal
}
}
}
""")
}
"MyAutomaticJsonEventAdapter" must {
"demonstrate how to implement a JSON adapter" in {
val p = TestProbe()
val props = Props(new PersistentActor {
override def persistenceId: String = "json-actor"
override def journalPluginId: String = "akka.persistence.journal.auto-json-store"
override def receiveRecover: Receive = {
case RecoveryCompleted => // ignore...
case e => p.ref ! e
}
override def receiveCommand: Receive = {
case c => persist(c) { e => p.ref ! e }
}
})
val p1 = system.actorOf(props)
val m1 = Person("Caplin", 42)
val m2 = Box(13)
p1 ! m1
p1 ! m2
p.expectMsg(m1)
p.expectMsg(m2)
val p2 = system.actorOf(props)
p.expectMsg(m1)
p.expectMsg(m2)
}
}
"MyManualJsonEventAdapter" must {
"demonstrate how to implement a JSON adapter" in {
val p = TestProbe()
val props = Props(new PersistentActor {
override def persistenceId: String = "json-actor"
override def journalPluginId: String = "akka.persistence.journal.manual-json-store"
override def receiveRecover: Receive = {
case RecoveryCompleted => // ignore...
case e => p.ref ! e
}
override def receiveCommand: Receive = {
case c => persist(c) { e => p.ref ! e }
}
})
val p1 = system.actorOf(props)
val m1 = Person("Caplin", 42)
val m2 = Box(13)
p1 ! m1
p1 ! m2
p.expectMsg(m1)
p.expectMsg(m2)
val p2 = system.actorOf(props)
p.expectMsg(m1)
p.expectMsg(m2)
}
}
}
trait DomainEvent
case class Person(name: String, age: Int) extends DomainEvent
case class Box(length: Int) extends DomainEvent
case class MyTaggingJournalModel(payload: Any, tags: immutable.Set[String])
//#identity-event-adapter
class MyEventAdapter(system: ExtendedActorSystem) extends EventAdapter {
override def manifest(event: Any): String =
"" // when no manifest needed, return ""
override def toJournal(event: Any): Any =
event // identity
override def fromJournal(event: Any, manifest: String): EventSeq =
EventSeq.single(event) // identity
}
//#identity-event-adapter
/**
* This is an example adapter which completely takes care of domain<->json translation.
* It allows the journal to take care of the manifest handling, which is the FQN of the serialized class.
*/
class MyAutoJsonEventAdapter(system: ExtendedActorSystem) extends EventAdapter {
private val gson = new Gson
override def manifest(event: Any): String =
event.getClass.getCanonicalName
override def toJournal(event: Any): Any = gson.toJsonTree(event)
override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single {
event match {
case json: JsonElement =>
val clazz = system.dynamicAccess.getClassFor[Any](manifest).get
gson.fromJson(json, clazz)
}
}
}
class MyUpcastingEventAdapter(system: ExtendedActorSystem) extends EventAdapter {
override def manifest(event: Any): String = ""
override def toJournal(event: Any): Any = ???
override def fromJournal(event: Any, manifest: String): EventSeq = ???
}
/**
* This is an example adapter which completely takes care of domain<->json translation.
* It manually takes care of smuggling through the manifest even if the journal does not do anything in this regard,
* which is the case in this example since we're persisting to the inmem journal, which does nothing in terms of manifest.
*/
class MyManualJsonEventAdapter(system: ExtendedActorSystem) extends EventAdapter {
private val gson = new Gson
override def manifest(event: Any): String = event.getClass.getCanonicalName
override def toJournal(event: Any): Any = {
val out = gson.toJsonTree(event).getAsJsonObject
// optionally can include manifest in the adapted event.
// some journals will store the manifest transparently
out.addProperty("_manifest", manifest(event))
out
}
override def fromJournal(event: Any, m: String): EventSeq = event match {
case json: JsonElement =>
val manifest = json.getAsJsonObject.get("_manifest").getAsString
val clazz = system.dynamicAccess.getClassFor[Any](manifest).get
EventSeq.single(gson.fromJson(json, clazz))
}
}
class MyTaggingEventAdapter(system: ExtendedActorSystem) extends EventAdapter {
override def manifest(event: Any): String = ""
override def fromJournal(event: Any, manifest: String): EventSeq = event match {
case j: MyTaggingJournalModel => EventSeq.single(j)
}
override def toJournal(event: Any): Any = {
event match {
case Person(_, age) if age >= 18 => MyTaggingJournalModel(event, tags = Set("adult"))
case Person(_, age) => MyTaggingJournalModel(event, tags = Set("minor"))
case _ => MyTaggingJournalModel(event, tags = Set.empty)
}
}
}
object v1 {
trait Event
trait UserEvent extends v1.Event
trait ItemEvent extends v1.Event
}

View file

@ -92,6 +92,9 @@ object SharedLeveldbPluginDocSpec {
//#shared-store-config //#shared-store-config
akka.persistence.journal.leveldb-shared.store.dir = "target/shared" akka.persistence.journal.leveldb-shared.store.dir = "target/shared"
//#shared-store-config //#shared-store-config
//#event-adapter-config
akka.persistence.journal.leveldb-shared.adapter = "com.example.MyAdapter"
//#event-adapter-config
""" """
//#shared-store-usage //#shared-store-usage

View file

@ -494,6 +494,54 @@ not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedM
The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages`` The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages``
configuration key. The method can be overridden by implementation classes to return non-default values. configuration key. The method can be overridden by implementation classes to return non-default values.
.. _event-adapters-scala:
Event Adapters
==============
.. note::
Complete documentation featuring use-cases and implementation examples for this feature will follow shortly.
In long running projects using event sourcing sometimes the need arrises to detach the data model from the domain model
completely.
Event Adapters help in situations where:
- **Version Migrations** existing events stored in *Version 1* should be "upcasted" to a new *Version 2* representation,
and the process of doing so involves actual code, not just changes on the serialization layer. For these scenarios
the ``toJournal`` function is usually an identity function, however the ``fromJournal`` is implemented as
``v1.Event=>v2.Event``, performing the neccessary mapping inside the fromJournal method.
This technique is sometimes refered to as "upcasting" in other CQRS libraries.
- **Separating Domain and Data models** thanks to EventAdapters it is possible to completely separate the domain model
from the model used to persist data in the Journals. For example one may want to use case classes in the
domain model, however persist their protocol-buffer (or any other binary serialization format) counter-parts to the Journal.
A simple ``toJournal:MyModel=>MyDataModel`` and ``fromJournal:MyDataModel=>MyModel`` adapter can be used to implement this feature.
- **Journal Specialized Data Types** exposing data types understood by the underlying Journal, for example for data stores which
understand JSON it is possible to write an EventAdapter ``toJournal:Any=>JSON`` such that the Journal can *directly* store the
json instead of serializing the object to its binary representation.
Implementing an EventAdapter is rather stright forward:
.. includecode:: code/docs/persistence/PersistenceEventAdapterDocSpec.scala#identity-event-adapter
Then in order for it to be used on events coming to and from the journal you must bind it using the below configuration syntax:
.. includecode:: code/docs/persistence/PersistenceEventAdapterDocSpec.scala#event-adapters-config
It is possible to bind multiple adapters to one class *for recovery*, in which case the ``fromJournal`` methods of all
bound adapters will be applied to a given matching event (in order of definition in the configuration). Since each adapter may
return from ``0`` to ``n`` adapted events (called as ``EventSeq``), each adapter can investigate the event and if it should
indeed adapt it return the adapted event(s) for it, other adapters which do not have anything to contribute during this
adaptation simply return ``EventSeq.empty``. The adapted events are then delivered in-order to the ``PersistentActor`` during replay.
.. note::
More advanced techniques utilising advanced binary serialization formats such as protocol buffers or kryo / thrift / avro
will be documented very soon. These schema evolutions often may need to reach into the serialization layer, however
are much more powerful in terms of flexibly removing unused/deprecated classes from your classpath etc.
.. _persistent-fsm: .. _persistent-fsm:
Persistent FSM Persistent FSM

View file

@ -46,7 +46,7 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
extension.journalFor(null) extension.journalFor(null)
def replayedMessage(snr: Long, deleted: Boolean = false, confirms: Seq[String] = Nil): ReplayedMessage = def replayedMessage(snr: Long, deleted: Boolean = false, confirms: Seq[String] = Nil): ReplayedMessage =
ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, deleted, Actor.noSender)) ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, "", deleted, Actor.noSender))
def writeMessages(from: Int, to: Int, pid: String, sender: ActorRef): Unit = { def writeMessages(from: Int, to: Int, pid: String, sender: ActorRef): Unit = {
val msgs = from to to map { i PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender) } val msgs = from to to map { i PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender) }
@ -56,7 +56,7 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
probe.expectMsg(WriteMessagesSuccessful) probe.expectMsg(WriteMessagesSuccessful)
from to to foreach { i from to to foreach { i
probe.expectMsgPF() { case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, `sender`), _) payload should be(s"a-${i}") } probe.expectMsgPF() { case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`), _) payload should be(s"a-${i}") }
} }
} }

View file

@ -98,6 +98,21 @@ public final class MessageFormats {
*/ */
com.google.protobuf.ByteString com.google.protobuf.ByteString
getSenderBytes(); getSenderBytes();
// optional string manifest = 12;
/**
* <code>optional string manifest = 12;</code>
*/
boolean hasManifest();
/**
* <code>optional string manifest = 12;</code>
*/
java.lang.String getManifest();
/**
* <code>optional string manifest = 12;</code>
*/
com.google.protobuf.ByteString
getManifestBytes();
} }
/** /**
* Protobuf type {@code PersistentMessage} * Protobuf type {@code PersistentMessage}
@ -183,6 +198,11 @@ public final class MessageFormats {
sender_ = input.readBytes(); sender_ = input.readBytes();
break; break;
} }
case 98: {
bitField0_ |= 0x00000020;
manifest_ = input.readBytes();
break;
}
} }
} }
} catch (com.google.protobuf.InvalidProtocolBufferException e) { } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -387,12 +407,56 @@ public final class MessageFormats {
} }
} }
// optional string manifest = 12;
public static final int MANIFEST_FIELD_NUMBER = 12;
private java.lang.Object manifest_;
/**
* <code>optional string manifest = 12;</code>
*/
public boolean hasManifest() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
/**
* <code>optional string manifest = 12;</code>
*/
public java.lang.String getManifest() {
java.lang.Object ref = manifest_;
if (ref instanceof java.lang.String) {
return (java.lang.String) ref;
} else {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
manifest_ = s;
}
return s;
}
}
/**
* <code>optional string manifest = 12;</code>
*/
public com.google.protobuf.ByteString
getManifestBytes() {
java.lang.Object ref = manifest_;
if (ref instanceof java.lang.String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
manifest_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
private void initFields() { private void initFields() {
payload_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance(); payload_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance();
sequenceNr_ = 0L; sequenceNr_ = 0L;
persistenceId_ = ""; persistenceId_ = "";
deleted_ = false; deleted_ = false;
sender_ = ""; sender_ = "";
manifest_ = "";
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -427,6 +491,9 @@ public final class MessageFormats {
if (((bitField0_ & 0x00000010) == 0x00000010)) { if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeBytes(11, getSenderBytes()); output.writeBytes(11, getSenderBytes());
} }
if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeBytes(12, getManifestBytes());
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -456,6 +523,10 @@ public final class MessageFormats {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeBytesSize(11, getSenderBytes()); .computeBytesSize(11, getSenderBytes());
} }
if (((bitField0_ & 0x00000020) == 0x00000020)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(12, getManifestBytes());
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -587,6 +658,8 @@ public final class MessageFormats {
bitField0_ = (bitField0_ & ~0x00000008); bitField0_ = (bitField0_ & ~0x00000008);
sender_ = ""; sender_ = "";
bitField0_ = (bitField0_ & ~0x00000010); bitField0_ = (bitField0_ & ~0x00000010);
manifest_ = "";
bitField0_ = (bitField0_ & ~0x00000020);
return this; return this;
} }
@ -639,6 +712,10 @@ public final class MessageFormats {
to_bitField0_ |= 0x00000010; to_bitField0_ |= 0x00000010;
} }
result.sender_ = sender_; result.sender_ = sender_;
if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
to_bitField0_ |= 0x00000020;
}
result.manifest_ = manifest_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -674,6 +751,11 @@ public final class MessageFormats {
sender_ = other.sender_; sender_ = other.sender_;
onChanged(); onChanged();
} }
if (other.hasManifest()) {
bitField0_ |= 0x00000020;
manifest_ = other.manifest_;
onChanged();
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -1086,6 +1168,80 @@ public final class MessageFormats {
return this; return this;
} }
// optional string manifest = 12;
private java.lang.Object manifest_ = "";
/**
* <code>optional string manifest = 12;</code>
*/
public boolean hasManifest() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
/**
* <code>optional string manifest = 12;</code>
*/
public java.lang.String getManifest() {
java.lang.Object ref = manifest_;
if (!(ref instanceof java.lang.String)) {
java.lang.String s = ((com.google.protobuf.ByteString) ref)
.toStringUtf8();
manifest_ = s;
return s;
} else {
return (java.lang.String) ref;
}
}
/**
* <code>optional string manifest = 12;</code>
*/
public com.google.protobuf.ByteString
getManifestBytes() {
java.lang.Object ref = manifest_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
manifest_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
/**
* <code>optional string manifest = 12;</code>
*/
public Builder setManifest(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000020;
manifest_ = value;
onChanged();
return this;
}
/**
* <code>optional string manifest = 12;</code>
*/
public Builder clearManifest() {
bitField0_ = (bitField0_ & ~0x00000020);
manifest_ = getDefaultInstance().getManifest();
onChanged();
return this;
}
/**
* <code>optional string manifest = 12;</code>
*/
public Builder setManifestBytes(
com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000020;
manifest_ = value;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:PersistentMessage) // @@protoc_insertion_point(builder_scope:PersistentMessage)
} }
@ -3900,21 +4056,22 @@ public final class MessageFormats {
descriptor; descriptor;
static { static {
java.lang.String[] descriptorData = { java.lang.String[] descriptorData = {
"\n\024MessageFormats.proto\"\204\001\n\021PersistentMes" + "\n\024MessageFormats.proto\"\226\001\n\021PersistentMes" +
"sage\022#\n\007payload\030\001 \001(\0132\022.PersistentPayloa" + "sage\022#\n\007payload\030\001 \001(\0132\022.PersistentPayloa" +
"d\022\022\n\nsequenceNr\030\002 \001(\003\022\025\n\rpersistenceId\030\003" + "d\022\022\n\nsequenceNr\030\002 \001(\003\022\025\n\rpersistenceId\030\003" +
" \001(\t\022\017\n\007deleted\030\004 \001(\010\022\016\n\006sender\030\013 \001(\t\"S\n" + " \001(\t\022\017\n\007deleted\030\004 \001(\010\022\016\n\006sender\030\013 \001(\t\022\020\n" +
"\021PersistentPayload\022\024\n\014serializerId\030\001 \002(\005" + "\010manifest\030\014 \001(\t\"S\n\021PersistentPayload\022\024\n\014" +
"\022\017\n\007payload\030\002 \002(\014\022\027\n\017payloadManifest\030\003 \001" + "serializerId\030\001 \002(\005\022\017\n\007payload\030\002 \002(\014\022\027\n\017p" +
"(\014\"\356\001\n\033AtLeastOnceDeliverySnapshot\022\031\n\021cu" + "ayloadManifest\030\003 \001(\014\"\356\001\n\033AtLeastOnceDeli" +
"rrentDeliveryId\030\001 \002(\003\022O\n\025unconfirmedDeli" + "verySnapshot\022\031\n\021currentDeliveryId\030\001 \002(\003\022" +
"veries\030\002 \003(\01320.AtLeastOnceDeliverySnapsh" + "O\n\025unconfirmedDeliveries\030\002 \003(\01320.AtLeast" +
"ot.UnconfirmedDelivery\032c\n\023UnconfirmedDel", "OnceDeliverySnapshot.UnconfirmedDelivery",
"ivery\022\022\n\ndeliveryId\030\001 \002(\003\022\023\n\013destination" + "\032c\n\023UnconfirmedDelivery\022\022\n\ndeliveryId\030\001 " +
"\030\002 \002(\t\022#\n\007payload\030\003 \002(\0132\022.PersistentPayl" + "\002(\003\022\023\n\013destination\030\002 \002(\t\022#\n\007payload\030\003 \002(" +
"oad\"F\n\032PersistentStateChangeEvent\022\027\n\017sta" + "\0132\022.PersistentPayload\"F\n\032PersistentState" +
"teIdentifier\030\001 \002(\t\022\017\n\007timeout\030\002 \001(\tB\"\n\036a" + "ChangeEvent\022\027\n\017stateIdentifier\030\001 \002(\t\022\017\n\007" +
"kka.persistence.serializationH\001" "timeout\030\002 \001(\tB\"\n\036akka.persistence.serial" +
"izationH\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -3926,7 +4083,7 @@ public final class MessageFormats {
internal_static_PersistentMessage_fieldAccessorTable = new internal_static_PersistentMessage_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_PersistentMessage_descriptor, internal_static_PersistentMessage_descriptor,
new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Sender", }); new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Sender", "Manifest", });
internal_static_PersistentPayload_descriptor = internal_static_PersistentPayload_descriptor =
getDescriptor().getMessageTypes().get(1); getDescriptor().getMessageTypes().get(1);
internal_static_PersistentPayload_fieldAccessorTable = new internal_static_PersistentPayload_fieldAccessorTable = new

View file

@ -16,6 +16,7 @@ message PersistentMessage {
// optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 // optional DeliveredMessage confirmMessage = 9; // Removed in 2.4
// optional string confirmTarget = 10; // Removed in 2.4 // optional string confirmTarget = 10; // Removed in 2.4
optional string sender = 11; // not stored in journal, needed for remote serialization optional string sender = 11; // not stored in journal, needed for remote serialization
optional string manifest = 12;
} }
message PersistentPayload { message PersistentPayload {

View file

@ -4,10 +4,10 @@
package akka.persistence package akka.persistence
import scala.collection.immutable
import akka.actor._ import akka.actor._
import scala.collection.immutable
/** /**
* INTERNAL API. * INTERNAL API.
* *

View file

@ -4,16 +4,17 @@
package akka.persistence package akka.persistence
import scala.concurrent.duration._ import java.util.concurrent.atomic.AtomicReference
import com.typesafe.config.Config
import akka.actor._ import akka.actor._
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.persistence.journal.AsyncWriteJournal import akka.event.{ Logging, LoggingAdapter }
import akka.persistence.journal.{ AsyncWriteJournal, EventAdapters, IdentityEventAdapters }
import akka.util.Helpers.ConfigOps import akka.util.Helpers.ConfigOps
import akka.event.LoggingAdapter import com.typesafe.config.Config
import akka.event.Logging
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.duration._
/** /**
* Persistence configuration. * Persistence configuration.
@ -110,7 +111,7 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system) def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system)
def lookup() = Persistence def lookup() = Persistence
/** INTERNAL API. */ /** INTERNAL API. */
private[persistence] case class PluginHolder(actor: ActorRef) extends Extension private[persistence] case class PluginHolder(actor: ActorRef, adapters: EventAdapters) extends Extension
} }
/** /**
@ -142,7 +143,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
val settings = new PersistenceSettings(config) val settings = new PersistenceSettings(config)
private def journalDispatchSelector(klaz: Class[_]): String = private def journalDispatchSelector(klaz: Class[_]): String =
if (classOf[AsyncWriteJournal].isAssignableFrom(klaz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId if (classOf[AsyncWriteJournal].isAssignableFrom(klaz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId // TODO sure this is not inverted?
private def snapshotDispatchSelector(klaz: Class[_]): String = private def snapshotDispatchSelector(klaz: Class[_]): String =
DefaultPluginDispatcherId DefaultPluginDispatcherId
@ -156,6 +157,43 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
/** Discovered persistence snapshot store plugins. */ /** Discovered persistence snapshot store plugins. */
private val snapshotPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) private val snapshotPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)
/**
* Returns an [[EventAdapters]] object which serves as a per-journal collection of bound event adapters.
* If no adapters are registered for a given journal the EventAdapters object will simply return the identity
* adapter for each class, otherwise the most specific adapter matching a given class will be returned.
*/
@tailrec final def adaptersFor(journalPluginId: String): EventAdapters = {
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
val extensionIdMap = journalPluginExtensionId.get
extensionIdMap.get(configPath) match {
case Some(extensionId)
extensionId(system).adapters
case None
val extensionId = new ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
val plugin = createPlugin(configPath)(journalDispatchSelector)
val adapters = createAdapters(configPath)
PluginHolder(plugin, adapters)
}
}
journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
adaptersFor(journalPluginId) // Recursive invocation.
}
}
/**
* INTERNAL API
* Looks up [[EventAdapters]] by journal plugin's ActorRef.
*/
private[akka] final def adaptersFor(journalPluginActor: ActorRef): EventAdapters = {
journalPluginExtensionId.get().values collectFirst {
case ext if ext(system).actor == journalPluginActor ext(system).adapters
} match {
case Some(adapters) adapters
case _ IdentityEventAdapters
}
}
/** /**
* Returns a journal plugin actor identified by `journalPluginId`. * Returns a journal plugin actor identified by `journalPluginId`.
* When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path. * When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path.
@ -170,8 +208,11 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
extensionId(system).actor extensionId(system).actor
case None case None
val extensionId = new ExtensionId[PluginHolder] { val extensionId = new ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder = override def createExtension(system: ExtendedActorSystem): PluginHolder = {
PluginHolder(createPlugin(configPath)(journalDispatchSelector)) val plugin = createPlugin(configPath)(journalDispatchSelector)
val adapters = createAdapters(configPath)
PluginHolder(plugin, adapters)
}
} }
journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
journalFor(journalPluginId) // Recursive invocation. journalFor(journalPluginId) // Recursive invocation.
@ -192,8 +233,11 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
extensionId(system).actor extensionId(system).actor
case None case None
val extensionId = new ExtensionId[PluginHolder] { val extensionId = new ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder = override def createExtension(system: ExtendedActorSystem): PluginHolder = {
PluginHolder(createPlugin(configPath)(snapshotDispatchSelector)) val plugin = createPlugin(configPath)(snapshotDispatchSelector)
val adapters = createAdapters(configPath)
PluginHolder(plugin, adapters)
}
} }
snapshotPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) snapshotPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
snapshotStoreFor(snapshotPluginId) // Recursive invocation. snapshotStoreFor(snapshotPluginId) // Recursive invocation.
@ -202,12 +246,12 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
private def createPlugin(configPath: String)(dispatcherSelector: Class[_] String) = { private def createPlugin(configPath: String)(dispatcherSelector: Class[_] String) = {
require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), require(!isEmpty(configPath) && system.settings.config.hasPath(configPath),
s"'reference.conf' is missing persistence plugin config path: '${configPath}'") s"'reference.conf' is missing persistence plugin config path: '$configPath'")
val pluginActorName = configPath val pluginActorName = configPath
val pluginConfig = system.settings.config.getConfig(configPath) val pluginConfig = system.settings.config.getConfig(configPath)
val pluginClassName = pluginConfig.getString("class") val pluginClassName = pluginConfig.getString("class")
log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}") log.debug(s"Create plugin: $pluginActorName $pluginClassName")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get val pluginClass = system.dynamicAccess.getClassFor[Any](pluginClassName).get
val pluginInjectConfig = if (pluginConfig.hasPath("inject-config")) pluginConfig.getBoolean("inject-config") else false val pluginInjectConfig = if (pluginConfig.hasPath("inject-config")) pluginConfig.getBoolean("inject-config") else false
val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass) val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass)
val pluginActorArgs = if (pluginInjectConfig) List(pluginConfig) else Nil val pluginActorArgs = if (pluginInjectConfig) List(pluginConfig) else Nil
@ -215,6 +259,11 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
system.systemActorOf(pluginActorProps, pluginActorName) system.systemActorOf(pluginActorProps, pluginActorName)
} }
private def createAdapters(configPath: String): EventAdapters = {
val pluginConfig = system.settings.config.getConfig(configPath)
EventAdapters(system, pluginConfig)
}
/** Creates a canonical persistent actor id from a persistent actor ref. */ /** Creates a canonical persistent actor id from a persistent actor ref. */
def persistenceId(persistentActor: ActorRef): String = id(persistentActor) def persistenceId(persistentActor: ActorRef): String = id(persistentActor)

View file

@ -7,10 +7,7 @@ package akka.persistence
import java.lang.{ Iterable JIterable } import java.lang.{ Iterable JIterable }
import java.util.{ List JList } import java.util.{ List JList }
import scala.collection.immutable
import akka.actor.{ ActorContext, ActorRef } import akka.actor.{ ActorContext, ActorRef }
import akka.japi.Util.immutableSeq
import akka.pattern.PromiseActorRef import akka.pattern.PromiseActorRef
import akka.persistence.serialization.Message import akka.persistence.serialization.Message
@ -40,13 +37,17 @@ private[persistence] final case class NonPersistentRepr(payload: Any, sender: Ac
* @see [[akka.persistence.journal.AsyncRecovery]] * @see [[akka.persistence.journal.AsyncRecovery]]
*/ */
trait PersistentRepr extends PersistentEnvelope with Message { trait PersistentRepr extends PersistentEnvelope with Message {
import scala.collection.JavaConverters._
/** /**
* This persistent message's payload. * This persistent message's payload.
*/ */
def payload: Any def payload: Any
/**
* Returns the persistent payload's manifest if available
*/
def manifest: String
/** /**
* Persistent id that journals a persistent message * Persistent id that journals a persistent message
*/ */
@ -62,6 +63,11 @@ trait PersistentRepr extends PersistentEnvelope with Message {
*/ */
def withPayload(payload: Any): PersistentRepr def withPayload(payload: Any): PersistentRepr
/**
* Creates a new persistent message with the specified `manifest`.
*/
def withManifest(manifest: String): PersistentRepr
/** /**
* `true` if this message is marked as deleted. * `true` if this message is marked as deleted.
*/ */
@ -83,10 +89,10 @@ trait PersistentRepr extends PersistentEnvelope with Message {
} }
object PersistentRepr { object PersistentRepr {
/** /** Plugin API: value of an undefined persistenceId or manifest. */
* Plugin API: value of an undefined processor id.
*/
val Undefined = "" val Undefined = ""
/** Plugin API: value of an undefined / identity event adapter. */
val UndefinedId = 0
/** /**
* Plugin API. * Plugin API.
@ -95,9 +101,10 @@ object PersistentRepr {
payload: Any, payload: Any,
sequenceNr: Long = 0L, sequenceNr: Long = 0L,
persistenceId: String = PersistentRepr.Undefined, persistenceId: String = PersistentRepr.Undefined,
manifest: String = PersistentRepr.Undefined,
deleted: Boolean = false, deleted: Boolean = false,
sender: ActorRef = null): PersistentRepr = sender: ActorRef = null): PersistentRepr =
PersistentImpl(payload, sequenceNr, persistenceId, deleted, sender) PersistentImpl(payload, sequenceNr, persistenceId, manifest, deleted, sender)
/** /**
* Java API, Plugin API. * Java API, Plugin API.
@ -115,20 +122,21 @@ object PersistentRepr {
* INTERNAL API. * INTERNAL API.
*/ */
private[persistence] final case class PersistentImpl( private[persistence] final case class PersistentImpl(
payload: Any, override val payload: Any,
sequenceNr: Long, override val sequenceNr: Long,
override val persistenceId: String, override val persistenceId: String,
deleted: Boolean, override val manifest: String,
sender: ActorRef) extends PersistentRepr { override val deleted: Boolean,
override val sender: ActorRef) extends PersistentRepr {
def withPayload(payload: Any): PersistentRepr = def withPayload(payload: Any): PersistentRepr =
copy(payload = payload) copy(payload = payload)
def update( def withManifest(manifest: String): PersistentRepr =
sequenceNr: Long, if (this.manifest == manifest) this
persistenceId: String, else copy(manifest = manifest)
deleted: Boolean,
sender: ActorRef) = def update(sequenceNr: Long, persistenceId: String, deleted: Boolean, sender: ActorRef) =
copy(sequenceNr = sequenceNr, persistenceId = persistenceId, deleted = deleted, sender = sender) copy(sequenceNr = sequenceNr, persistenceId = persistenceId, deleted = deleted, sender = sender)
} }

View file

@ -5,20 +5,20 @@
package akka.persistence.journal package akka.persistence.journal
import scala.collection.immutable
import scala.concurrent.Future
import scala.util._
import akka.actor._ import akka.actor._
import akka.pattern.pipe import akka.pattern.pipe
import akka.persistence._ import akka.persistence._
import scala.collection.immutable
import scala.concurrent.Future
import scala.util._
/** /**
* Abstract journal, optimized for asynchronous, non-blocking writes. * Abstract journal, optimized for asynchronous, non-blocking writes.
*/ */
trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
import JournalProtocol._
import AsyncWriteJournal._ import AsyncWriteJournal._
import JournalProtocol._
import context.dispatcher import context.dispatcher
private val extension = Persistence(context.system) private val extension = Persistence(context.system)
@ -47,12 +47,15 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
// Send replayed messages and replay result to persistentActor directly. No need // Send replayed messages and replay result to persistentActor directly. No need
// to resequence replayed messages relative to written and looped messages. // to resequence replayed messages relative to written and looped messages.
asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p
if (!p.deleted || replayDeleted) persistentActor.tell(ReplayedMessage(p), Actor.noSender) if (!p.deleted || replayDeleted)
adaptFromJournal(p).foreach { adaptedPersistentRepr
persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
}
} map { } map {
case _ ReplayMessagesSuccess case _ ReplayMessagesSuccess
} recover { } recover {
case e ReplayMessagesFailure(e) case e ReplayMessagesFailure(e)
} pipeTo (persistentActor) onSuccess { } pipeTo persistentActor onSuccess {
case _ if publish context.system.eventStream.publish(r) case _ if publish context.system.eventStream.publish(r)
} }
case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor) case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor)
@ -62,7 +65,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
highest ReadHighestSequenceNrSuccess(highest) highest ReadHighestSequenceNrSuccess(highest)
} recover { } recover {
case e ReadHighestSequenceNrFailure(e) case e ReadHighestSequenceNrFailure(e)
} pipeTo (persistentActor) } pipeTo persistentActor
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent)
asyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent) onComplete { asyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent) onComplete {
case Success(_) if (publish) context.system.eventStream.publish(d) case Success(_) if (publish) context.system.eventStream.publish(d)
@ -85,6 +88,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
*/ */
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit]
//#journal-plugin-api //#journal-plugin-api
} }
/** /**

View file

@ -20,7 +20,7 @@ import akka.util._
* *
* A journal that delegates actual storage to a target actor. For testing only. * A journal that delegates actual storage to a target actor. For testing only.
*/ */
private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash { private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash with ActorLogging {
import AsyncWriteProxy._ import AsyncWriteProxy._
import AsyncWriteTarget._ import AsyncWriteTarget._
@ -32,7 +32,8 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash
store = ref store = ref
unstashAll() unstashAll()
context.become(initialized) context.become(initialized)
case _ stash() case x
stash()
} }
implicit def timeout: Timeout implicit def timeout: Timeout
@ -43,8 +44,8 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] =
(store ? DeleteMessagesTo(persistenceId, toSequenceNr, permanent)).mapTo[Unit] (store ? DeleteMessagesTo(persistenceId, toSequenceNr, permanent)).mapTo[Unit]
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) Unit): Future[Unit] = { def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr Unit): Future[Unit] = {
val replayCompletionPromise = Promise[Unit] val replayCompletionPromise = Promise[Unit]()
val mediator = context.actorOf(Props(classOf[ReplayMediator], replayCallback, replayCompletionPromise, timeout.duration).withDeploy(Deploy.local)) val mediator = context.actorOf(Props(classOf[ReplayMediator], replayCallback, replayCompletionPromise, timeout.duration).withDeploy(Deploy.local))
store.tell(ReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max), mediator) store.tell(ReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max), mediator)
replayCompletionPromise.future replayCompletionPromise.future

View file

@ -0,0 +1,91 @@
/*
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal
import scala.annotation.varargs
import scala.collection.immutable
/**
* Facility to convert from and to specialised data models, as may be required by specialized persistence Journals.
*
* Typical use cases include (but are not limited to):
* <ul>
* <li>adding metadata, a.k.a. "tagging" - by wrapping objects into tagged counterparts</li>
* <li>manually converting to the Journals storage format, such as JSON, BSON or any specialised binary format</li>
* <li>adapting incoming events in any way before persisting them by the journal</li>
* </ul>
*/
abstract class EventAdapter {
//#event-adapter-api
/**
* Return the manifest (type hint) that will be provided in the `fromJournal` method.
* Use `""` if manifest is not needed.
*/
def manifest(event: Any): String
/**
* Convert domain event to journal event type.
*
* Some journal may require a specific type to be returned to them,
* for example if a primary key has to be associated with each event then a journal
* may require adapters to return `com.example.myjournal.EventWithPrimaryKey(event, key)`.
*
* The `toJournal` adaptation must be an 1-to-1 transformation.
* It is not allowed to drop incoming events during the `toJournal` adaptation.
*
* @param event the application-side domain event to be adapted to the journal model
* @return the adapted event object, possibly the same object if no adaptation was performed
*/
def toJournal(event: Any): Any
/**
* Convert a event from its journal model to the applications domain model.
*
* One event may be adapter into multiple (or none) events which should be delivered to the [[akka.persistence.PersistentActor]].
* Use the specialised [[EventSeq.single]] method to emit exactly one event, or [[EventSeq.empty]] in case the adapter
* is not handling this event. Multiple [[EventAdapter]] instances are applied in order as defined in configuration
* and their emitted event seqs are concatenated and delivered in order to the PersistentActor.
*
* @param event event to be adapted before delivering to the PersistentActor
* @param manifest optionally provided manifest (type hint) in case the Adapter has stored one for this event, `""` if none
* @return sequence containing the adapted events (possibly zero) which will be delivered to the PersistentActor
*/
def fromJournal(event: Any, manifest: String): EventSeq
//#event-adapter-api
}
sealed abstract class EventSeq {
def events: immutable.Seq[Any]
}
object EventSeq {
/** Java API */
final def empty: EventSeq = EmptyEventSeq
/** Java API */
final def single(event: Any): EventSeq = new SingleEventSeq(event)
/** Java API */
@varargs final def create(events: Any*): EventSeq = EventsSeq(events.toList)
final def apply(events: Any*): EventSeq = EventsSeq(events.toList)
}
final case class SingleEventSeq(event: Any) extends EventSeq { // TODO try to make it a value class, would save allocations
override val events: immutable.Seq[Any] = List(event)
override def toString = s"SingleEventSeq($event)"
}
sealed trait EmptyEventSeq extends EventSeq
final object EmptyEventSeq extends EmptyEventSeq {
override def events = Nil
}
final case class EventsSeq[E](events: immutable.Seq[E]) extends EventSeq
/** No-op model adapter which passes through the incoming events as-is. */
final case object IdentityEventAdapter extends EventAdapter {
override def toJournal(event: Any): Any = event
override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event)
override def manifest(event: Any): String = ""
}

View file

@ -0,0 +1,156 @@
/*
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal
import java.util
import java.util.concurrent.ConcurrentHashMap
import akka.actor.ExtendedActorSystem
import akka.event.{ Logging, LoggingAdapter }
import com.typesafe.config.Config
import scala.collection.immutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.Try
/** INTERNAL API */
private[akka] class EventAdapters(
map: ConcurrentHashMap[Class[_], EventAdapter],
bindings: immutable.Seq[(Class[_], EventAdapter)],
log: LoggingAdapter) {
/**
* Finds the "most specific" matching adapter for the given class (i.e. it may return an adapter that can work on a
* interface implemented by the given class if no direct match is found).
*
* Falls back to [[IdentityEventAdapter]] if no adapter was defined for the given class.
*/
def get(clazz: Class[_]): EventAdapter = {
map.get(clazz) match {
case null // bindings are ordered from most specific to least specific
val value = bindings filter {
_._1 isAssignableFrom clazz
} match {
case (_, bestMatch) +: _ bestMatch
case _ IdentityEventAdapter
}
map.putIfAbsent(clazz, value) match {
case null
log.debug(s"Using EventAdapter: {} for event [{}]", value.getClass.getName, clazz.getName)
value
case some some
}
case value value
}
}
override def toString =
s"${getClass.getName}($map, $bindings)"
}
/** INTERNAL API */
private[akka] object EventAdapters {
type Name = String
type BoundAdapters = immutable.Seq[String]
type FQN = String
type ClassHandler = (Class[_], EventAdapter)
def apply(system: ExtendedActorSystem, config: Config): EventAdapters = {
val adapters = configToMap(config, "event-adapters")
val adapterBindings = configToListMap(config, "event-adapter-bindings")
apply(system, adapters, adapterBindings)
}
private def apply(
system: ExtendedActorSystem,
adapters: Map[Name, FQN],
adapterBindings: Map[FQN, BoundAdapters]): EventAdapters = {
val adapterNames = adapters.keys.toSet
for {
(fqn, boundToAdapters) adapterBindings
boundAdapter boundToAdapters
} require(adapterNames(boundAdapter.toString),
s"$fqn was bound to undefined event-adapter: $boundAdapter (bindings: ${boundToAdapters.mkString("[", ", ", "]")}, known adapters: ${adapters.keys.mkString})")
// A Map of handler from alias to implementation (i.e. class implementing akka.serialization.Serializer)
// For example this defines a handler named 'country': `"country" -> com.example.comain.CountryTagsAdapter`
val handlers = for ((k: String, v: String) adapters) yield k -> instantiate[EventAdapter](v, system).get
// bindings is a Seq of tuple representing the mapping from Class to handler.
// It is primarily ordered by the most specific classes first, and secondly in the configured order.
val bindings: immutable.Seq[ClassHandler] = {
val bs = for ((k: FQN, as: BoundAdapters) adapterBindings)
yield if (as.size == 1) (system.dynamicAccess.getClassFor[Any](k).get, handlers(as.head))
else (system.dynamicAccess.getClassFor[Any](k).get, CombinedReadEventAdapter(as.map(handlers)))
sort(bs)
}
val backing = (new ConcurrentHashMap[Class[_], EventAdapter] /: bindings) { case (map, (c, s)) map.put(c, s); map }
new EventAdapters(backing, bindings, system.log)
}
/** INTERNAL API */
private[akka] case class CombinedReadEventAdapter(adapters: immutable.Seq[EventAdapter]) extends EventAdapter {
private def onlyReadSideException = new IllegalStateException("CombinedReadEventAdapter must not be used when writing (creating manifests) events!")
override def manifest(event: Any): String = throw onlyReadSideException
override def toJournal(event: Any): Any = throw onlyReadSideException
override def fromJournal(event: Any, manifest: String): EventSeq =
EventSeq(adapters.flatMap(_.fromJournal(event, manifest).events): _*) // TODO could we could make EventSeq flatMappable
override def toString =
s"CombinedReadEventAdapter(${adapters.map(_.getClass.getCanonicalName).mkString(",")})"
}
/**
* Tries to load the specified Serializer by the fully-qualified name; the actual
* loading is performed by the systems [[akka.actor.DynamicAccess]].
*/
private def instantiate[T: ClassTag](fqn: FQN, system: ExtendedActorSystem): Try[T] =
system.dynamicAccess.createInstanceFor[T](fqn, List(classOf[ExtendedActorSystem] -> system)) recoverWith {
case _: NoSuchMethodException system.dynamicAccess.createInstanceFor[T](fqn, Nil)
}
/**
* Sort so that subtypes always precede their supertypes, but without
* obeying any order between unrelated subtypes (insert sort).
*/
private def sort[T](in: Iterable[(Class[_], T)]): immutable.Seq[(Class[_], T)] =
(new ArrayBuffer[(Class[_], T)](in.size) /: in) { (buf, ca)
buf.indexWhere(_._1 isAssignableFrom ca._1) match {
case -1 buf append ca
case x buf insert (x, ca)
}
buf
}.to[immutable.Seq]
private final def configToMap(config: Config, path: String): Map[String, String] = {
import scala.collection.JavaConverters._
if (config.hasPath(path)) {
config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) k -> v.toString }
} else Map.empty
}
private final def configToListMap(config: Config, path: String): Map[String, immutable.Seq[String]] = {
import scala.collection.JavaConverters._
if (config.hasPath(path)) {
config.getConfig(path).root.unwrapped.asScala.toMap map {
case (k, v: util.ArrayList[_]) if v.isInstanceOf[util.ArrayList[_]] k -> v.asScala.map(_.toString).toList
case (k, v) k -> List(v.toString)
}
} else Map.empty
}
}
private[akka] case object IdentityEventAdapters extends EventAdapters(null, null, null) {
override def get(clazz: Class[_]): EventAdapter = IdentityEventAdapter
override def toString = Logging.simpleName(IdentityEventAdapters)
}

View file

@ -8,14 +8,14 @@ package akka.persistence.journal
import scala.collection.immutable import scala.collection.immutable
import scala.util._ import scala.util._
import akka.actor.Actor import akka.actor.{ ActorLogging, Actor }
import akka.pattern.pipe import akka.pattern.pipe
import akka.persistence._ import akka.persistence._
/** /**
* Abstract journal, optimized for synchronous writes. * Abstract journal, optimized for synchronous writes.
*/ */
trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery with ActorLogging {
import JournalProtocol._ import JournalProtocol._
import context.dispatcher import context.dispatcher
@ -39,22 +39,28 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
} }
throw e throw e
} }
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor, replayDeleted) case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor, replayDeleted)
asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p
if (!p.deleted || replayDeleted) persistentActor.tell(ReplayedMessage(p), p.sender) if (!p.deleted || replayDeleted)
adaptFromJournal(p).foreach { adaptedPersistentRepr
persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), adaptedPersistentRepr.sender)
}
} map { } map {
case _ ReplayMessagesSuccess case _ ReplayMessagesSuccess
} recover { } recover {
case e ReplayMessagesFailure(e) case e ReplayMessagesFailure(e)
} pipeTo (persistentActor) onSuccess { } pipeTo persistentActor onSuccess {
case _ if publish context.system.eventStream.publish(r) case _ if publish context.system.eventStream.publish(r)
} }
case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor) case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor)
asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).map { asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).map {
highest ReadHighestSequenceNrSuccess(highest) highest ReadHighestSequenceNrSuccess(highest)
} recover { } recover {
case e ReadHighestSequenceNrFailure(e) case e ReadHighestSequenceNrFailure(e)
} pipeTo (persistentActor) } pipeTo persistentActor
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent)
Try(deleteMessagesTo(persistenceId, toSequenceNr, permanent)) match { Try(deleteMessagesTo(persistenceId, toSequenceNr, permanent)) match {
case Success(_) if (publish) context.system.eventStream.publish(d) case Success(_) if (publish) context.system.eventStream.publish(d)

View file

@ -4,16 +4,37 @@
package akka.persistence.journal package akka.persistence.journal
import akka.persistence.{ PersistentRepr, PersistentEnvelope }
import akka.actor.Actor import akka.actor.Actor
import akka.persistence.{ Persistence, PersistentEnvelope, PersistentRepr }
import scala.collection.immutable import scala.collection.immutable
private[akka] trait WriteJournalBase { private[akka] trait WriteJournalBase {
this: Actor this: Actor
lazy val persistence = Persistence(context.system)
private def eventAdapters = persistence.adaptersFor(self)
protected def preparePersistentBatch(rb: immutable.Seq[PersistentEnvelope]): immutable.Seq[PersistentRepr] = protected def preparePersistentBatch(rb: immutable.Seq[PersistentEnvelope]): immutable.Seq[PersistentRepr] =
rb.collect { rb.collect { // collect instead of flatMap to avoid Some allocations
case p: PersistentRepr p.update(sender = Actor.noSender) // don't store sender case p: PersistentRepr adaptToJournal(p.update(sender = Actor.noSender)) // don't store the sender
} }
/** INTERNAL API */
private[akka] final def adaptFromJournal(repr: PersistentRepr): immutable.Seq[PersistentRepr] =
eventAdapters.get(repr.payload.getClass).fromJournal(repr.payload, repr.manifest).events map { adaptedPayload
repr.withPayload(adaptedPayload)
}
/** INTERNAL API */
private[akka] final def adaptToJournal(repr: PersistentRepr): PersistentRepr = {
val payload = repr.payload
val adapter = eventAdapters.get(payload.getClass)
val manifest = adapter.manifest(payload)
repr
.withPayload(adapter.toJournal(payload))
.withManifest(manifest)
}
} }

View file

@ -10,8 +10,7 @@ import scala.language.postfixOps
import akka.actor._ import akka.actor._
import akka.persistence._ import akka.persistence._
import akka.persistence.journal.AsyncWriteProxy import akka.persistence.journal.{ WriteJournalBase, AsyncWriteProxy, AsyncWriteTarget }
import akka.persistence.journal.AsyncWriteTarget
import akka.util.Timeout import akka.util.Timeout
/** /**
@ -72,7 +71,7 @@ private[persistence] trait InmemMessages {
/** /**
* INTERNAL API. * INTERNAL API.
*/ */
private[persistence] class InmemStore extends Actor with InmemMessages { private[persistence] class InmemStore extends Actor with InmemMessages with WriteJournalBase {
import AsyncWriteTarget._ import AsyncWriteTarget._
def receive = { def receive = {
@ -83,7 +82,7 @@ private[persistence] class InmemStore extends Actor with InmemMessages {
case DeleteMessagesTo(pid, tsnr, true) case DeleteMessagesTo(pid, tsnr, true)
sender() ! (1L to tsnr foreach { snr delete(pid, snr) }) sender() ! (1L to tsnr foreach { snr delete(pid, snr) })
case ReplayMessages(pid, fromSnr, toSnr, max) case ReplayMessages(pid, fromSnr, toSnr, max)
read(pid, fromSnr, toSnr, max).foreach(sender() ! _) read(pid, fromSnr, toSnr, max).foreach { sender() ! _ }
sender() ! ReplaySuccess sender() ! ReplaySuccess
case ReadHighestSequenceNr(persistenceId, _) case ReadHighestSequenceNr(persistenceId, _)
sender() ! highestSequenceNr(persistenceId) sender() ! highestSequenceNr(persistenceId)

View file

@ -7,20 +7,19 @@ package akka.persistence.journal.leveldb
import java.io.File import java.io.File
import scala.collection.immutable
import scala.util._
import org.iq80.leveldb._
import akka.actor._ import akka.actor._
import akka.persistence._ import akka.persistence._
import akka.persistence.journal.AsyncWriteTarget import akka.persistence.journal.{ WriteJournalBase, AsyncWriteTarget }
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import org.iq80.leveldb._
import scala.collection.immutable
import scala.util._
/** /**
* INTERNAL API. * INTERNAL API.
*/ */
private[persistence] trait LeveldbStore extends Actor with LeveldbIdMapping with LeveldbRecovery { private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with LeveldbIdMapping with LeveldbRecovery {
val configPath: String val configPath: String
val config = context.system.settings.config.getConfig(configPath) val config = context.system.settings.config.getConfig(configPath)
@ -112,11 +111,11 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le
import AsyncWriteTarget._ import AsyncWriteTarget._
def receive = { def receive = {
case WriteMessages(msgs) sender() ! writeMessages(msgs) case WriteMessages(msgs) sender() ! writeMessages(preparePersistentBatch(msgs))
case DeleteMessagesTo(pid, tsnr, permanent) sender() ! deleteMessagesTo(pid, tsnr, permanent) case DeleteMessagesTo(pid, tsnr, permanent) sender() ! deleteMessagesTo(pid, tsnr, permanent)
case ReadHighestSequenceNr(pid, fromSequenceNr) sender() ! readHighestSequenceNr(numericId(pid)) case ReadHighestSequenceNr(pid, fromSequenceNr) sender() ! readHighestSequenceNr(numericId(pid))
case ReplayMessages(pid, fromSnr, toSnr, max) case ReplayMessages(pid, fromSnr, toSnr, max)
Try(replayMessages(numericId(pid), fromSnr, toSnr, max)(sender() ! _)) match { Try(replayMessages(numericId(pid), fromSnr, toSnr, max)(p adaptFromJournal(p).foreach { sender() ! _ })) match {
case Success(max) sender() ! ReplaySuccess case Success(max) sender() ! ReplaySuccess
case Failure(cause) sender() ! ReplayFailure(cause) case Failure(cause) sender() ! ReplayFailure(cause)
} }

View file

@ -4,20 +4,19 @@
package akka.persistence.serialization package akka.persistence.serialization
import scala.concurrent.duration
import scala.concurrent.duration.Duration
import scala.language.existentials
import com.google.protobuf._
import akka.actor.{ ActorPath, ExtendedActorSystem } import akka.actor.{ ActorPath, ExtendedActorSystem }
import akka.japi.Util.immutableSeq import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot AtLeastOnceDeliverySnap, UnconfirmedDelivery }
import akka.persistence._ import akka.persistence._
import akka.persistence.fsm.PersistentFsmActor.StateChangeEvent
import akka.persistence.serialization.MessageFormats._ import akka.persistence.serialization.MessageFormats._
import akka.serialization._ import akka.serialization._
import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot AtLeastOnceDeliverySnap } import com.google.protobuf._
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
import scala.collection.immutable.VectorBuilder import scala.collection.immutable.VectorBuilder
import akka.persistence.fsm.PersistentFsmActor.StateChangeEvent import scala.concurrent.duration
import akka.actor.Actor import akka.actor.Actor
import scala.concurrent.duration.Duration
import scala.language.existentials
/** /**
* Marker trait for all protobuf-serializable messages in `akka.persistence`. * Marker trait for all protobuf-serializable messages in `akka.persistence`.
@ -164,6 +163,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
payload(persistentMessage.getPayload), payload(persistentMessage.getPayload),
persistentMessage.getSequenceNr, persistentMessage.getSequenceNr,
if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined, if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined,
if (persistentMessage.hasManifest) persistentMessage.getManifest else Undefined,
persistentMessage.getDeleted, persistentMessage.getDeleted,
if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else Actor.noSender) if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else Actor.noSender)
} }

View file

@ -0,0 +1,241 @@
/*
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import java.io.File
import akka.actor._
import akka.persistence.EndToEndEventAdapterSpec.NewA
import akka.persistence.journal.{ EventSeq, EventAdapter }
import akka.testkit.{ ImplicitSender, WatchedByCoroner, AkkaSpec, TestProbe }
import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.commons.io.FileUtils
import org.scalatest.{ WordSpecLike, Matchers, BeforeAndAfterAll }
import scala.concurrent.Await
import scala.concurrent.duration._
object EndToEndEventAdapterSpec {
trait AppModel { def payload: Any }
case class A(payload: Any) extends AppModel
case class B(payload: Any) extends AppModel
case class NewA(payload: Any) extends AppModel
case class NewB(payload: Any) extends AppModel
case class JSON(payload: Any)
class AEndToEndAdapter(system: ExtendedActorSystem) extends EventAdapter {
override def manifest(event: Any): String = event.getClass.getCanonicalName
override def toJournal(event: Any): Any =
event match { case m: AppModel JSON(m.payload) }
override def fromJournal(event: Any, manifest: String): EventSeq = event match {
case m: JSON if m.payload.toString.startsWith("a") EventSeq.single(A(m.payload))
case _ EventSeq.empty
}
}
class NewAEndToEndAdapter(system: ExtendedActorSystem) extends EventAdapter {
override def manifest(event: Any): String = event.getClass.getCanonicalName
override def toJournal(event: Any): Any =
event match { case m: AppModel JSON(m.payload) }
override def fromJournal(event: Any, manifest: String): EventSeq = event match {
case m: JSON if m.payload.toString.startsWith("a") EventSeq.single(NewA(m.payload))
case _ EventSeq.empty
}
}
class BEndToEndAdapter(system: ExtendedActorSystem) extends EventAdapter {
override def manifest(event: Any): String = event.getClass.getCanonicalName
override def toJournal(event: Any): Any =
event match { case m: AppModel JSON(m.payload) }
override def fromJournal(event: Any, manifest: String): EventSeq = event match {
case m: JSON if m.payload.toString.startsWith("b") EventSeq.single(B(m.payload))
case _ EventSeq.empty
}
}
class NewBEndToEndAdapter(system: ExtendedActorSystem) extends EventAdapter {
override def manifest(event: Any): String = event.getClass.getCanonicalName
override def toJournal(event: Any): Any =
event match { case m: AppModel JSON(m.payload) }
override def fromJournal(event: Any, manifest: String): EventSeq = event match {
case m: JSON if m.payload.toString.startsWith("b") EventSeq.single(NewB(m.payload))
case _ EventSeq.empty
}
}
class EndToEndAdapterActor(name: String, override val journalPluginId: String, probe: Option[ActorRef])
extends NamedPersistentActor(name) with PersistentActor {
var state: List[Any] = Nil
val persistIncoming: Receive = {
case GetState
state.reverse.foreach { sender() ! _ }
case in
persist(in) { e
state ::= e
sender() ! e
}
}
override def receiveRecover = {
case RecoveryCompleted // ignore
case e state ::= e
}
override def receiveCommand = persistIncoming
override def onReplayFailure(cause: Throwable, event: Option[Any]): Unit =
probe.foreach { _ ! cause }
}
}
abstract class EndToEndEventAdapterSpec(journalName: String, journalConfig: Config)
extends WordSpecLike with Matchers with BeforeAndAfterAll {
import EndToEndEventAdapterSpec._
val storageLocations = List("akka.persistence.journal.leveldb.dir")
.map(s new File(journalConfig.getString(s)))
override protected def beforeAll() {
storageLocations.foreach(FileUtils.deleteDirectory)
}
override protected def afterAll() {
storageLocations.foreach(FileUtils.deleteDirectory)
}
val noAdaptersConfig = ConfigFactory.parseString("")
val adaptersConfig = ConfigFactory.parseString(
s"""
|akka.persistence.journal {
| $journalName {
| event-adapters {
| a = "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$AEndToEndAdapter"
| b = "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$BEndToEndAdapter"
| }
| event-adapter-bindings {
| # to journal
| "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$A" = a
| "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$B" = b
| # from journal
| "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$JSON" = [a, b]
|
| }
| }
|}
""".stripMargin)
val newAdaptersConfig = ConfigFactory.parseString(
s"""
|akka.persistence.journal {
| $journalName {
| event-adapters {
| a = "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$NewAEndToEndAdapter"
| b = "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$NewBEndToEndAdapter"
| }
| event-adapter-bindings {
| # to journal
| "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$A" = a
| "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$B" = b
| # from journal
| "${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$JSON" = [a, b]
|
| }
| }
|}
""".stripMargin)
def persister(name: String, probe: Option[ActorRef] = None)(implicit system: ActorSystem) =
system.actorOf(Props(classOf[EndToEndAdapterActor], name, "akka.persistence.journal." + journalName, probe), name)
def withActorSystem[T](name: String, config: Config)(block: ActorSystem T): T = {
val system = ActorSystem(name, journalConfig withFallback config)
try block(system) finally Await.ready(system.terminate(), 3.seconds)
}
"EventAdapters in end-to-end scenarios" must {
"use the same adapter when reading as was used when writing to the journal" in
withActorSystem("SimpleSystem", adaptersConfig) { implicit system
val p = TestProbe()
implicit val ref = p.ref
val p1 = persister("p1")
val a = A("a1")
val b = B("b1")
p1 ! a
p1 ! b
p.expectMsg(a)
p.expectMsg(b)
p.watch(p1)
p1 ! PoisonPill
p.expectTerminated(p1)
val p11 = persister("p1")
p11 ! GetState
p.expectMsg(A("a1"))
p.expectMsg(B("b1"))
}
"allow using an adapter, when write was performed without an adapter" in {
val persistentName = "p2"
withActorSystem("NoAdapterSystem", adaptersConfig) { implicit system
val p = TestProbe()
implicit val ref = p.ref
val p2 = persister(persistentName)
val a = A("a1")
val b = B("b1")
p2 ! a
p2 ! b
p.expectMsg(a)
p.expectMsg(b)
p.watch(p2)
p2 ! PoisonPill
p.expectTerminated(p2)
val p11 = persister(persistentName)
p11 ! GetState
p.expectMsg(A("a1"))
p.expectMsg(B("b1"))
}
withActorSystem("NowAdaptersAddedSystem", newAdaptersConfig) { implicit system
val p = TestProbe()
implicit val ref = p.ref
val p22 = persister(persistentName)
p22 ! GetState
p.expectMsg(NewA("a1"))
p.expectMsg(NewB("b1"))
}
}
"give nice error message when unable to play back as adapter does not exist" in {
// after some time, we start the system a-new...
// and the adapter originally used for adapting A is missing from the configuration!
val journalPath = s"akka.persistence.journal.$journalName"
val missingAdapterConfig = adaptersConfig
.withoutPath(s"$journalPath.event-adapters.a")
.withoutPath(s"""$journalPath.event-adapter-bindings."${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$A"""")
intercept[IllegalArgumentException] {
withActorSystem("MissingAdapterSystem", journalConfig.withFallback(missingAdapterConfig)) { implicit system2
Persistence(system2).adaptersFor(s"akka.persistence.journal.$journalName").get(classOf[String])
}
}.getMessage should include("was bound to undefined event-adapter: a (bindings: [a, b], known adapters: b)")
}
}
}
// needs persistence between actor systems, thus not running with the inmem journal
class LeveldbEndToEndEventAdapterSpec extends EndToEndEventAdapterSpec("leveldb", PersistenceSpec.config("leveldb", "LeveldbEndToEndEventAdapterSpec"))

View file

@ -0,0 +1,220 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import akka.actor._
import akka.event.Logging
import akka.persistence.journal.{ SingleEventSeq, EventSeq, EventAdapter }
import akka.testkit.ImplicitSender
import com.typesafe.config.{ Config, ConfigFactory }
import scala.collection.immutable
object InmemEventAdapterSpec {
final val JournalModelClassName = classOf[InmemEventAdapterSpec].getCanonicalName + "$" + classOf[JournalModel].getSimpleName
trait JournalModel {
def payload: Any
def tags: immutable.Set[String]
}
final case class Tagged(payload: Any, tags: immutable.Set[String]) extends JournalModel
final case class NotTagged(payload: Any) extends JournalModel {
override def tags = Set.empty
}
final val DomainEventClassName = classOf[InmemEventAdapterSpec].getCanonicalName + "$" + classOf[DomainEvent].getSimpleName
trait DomainEvent
final case class TaggedDataChanged(tags: immutable.Set[String], value: Int) extends DomainEvent
final case class UserDataChanged(countryCode: String, age: Int) extends DomainEvent
class UserAgeTaggingAdapter extends EventAdapter {
val Adult = Set("adult")
val Minor = Set("minor")
override def toJournal(event: Any): Any = event match {
case e @ UserDataChanged(_, age) if age > 18 Tagged(e, Adult)
case e @ UserDataChanged(_, age) Tagged(e, Minor)
case e NotTagged(e)
}
override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single {
event match {
case m: JournalModel m.payload
}
}
override def manifest(event: Any): String = ""
}
class ReplayPassThroughAdapter extends UserAgeTaggingAdapter {
override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single {
event match {
case m: JournalModel event // don't unpack, just pass through the JournalModel
}
}
}
class LoggingAdapter(system: ExtendedActorSystem) extends EventAdapter {
final val log = Logging(system, getClass)
override def toJournal(event: Any): Any = {
log.info("On its way to the journal: []: " + event)
event
}
override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single {
log.info("On its way out from the journal: []: " + event)
event
}
override def manifest(event: Any): String = ""
}
class PersistAllIncomingActor(name: String, override val journalPluginId: String)
extends NamedPersistentActor(name) with PersistentActor {
var state: List[Any] = Nil
val persistIncoming: Receive = {
case GetState
state.reverse.foreach { sender() ! _ }
case in
persist(in) { e
state ::= e
sender() ! e
}
}
override def receiveRecover = {
case RecoveryCompleted // ignore
case e state ::= e
}
override def receiveCommand = persistIncoming
}
}
class InmemEventAdapterSpec(journalName: String, journalConfig: Config, adapterConfig: Config)
extends PersistenceSpec(journalConfig.withFallback(adapterConfig)) with ImplicitSender {
import InmemEventAdapterSpec._
def this() {
this("inmem", PersistenceSpec.config("inmem", "InmemPersistentTaggingSpec"), ConfigFactory.parseString(
s"""
|akka.persistence.journal {
|
| common-event-adapters {
| age = "${classOf[InmemEventAdapterSpec].getCanonicalName}$$UserAgeTaggingAdapter"
| replay-pass-through = "${classOf[InmemEventAdapterSpec].getCanonicalName}$$ReplayPassThroughAdapter"
| }
|
| inmem {
| event-adapters = $${akka.persistence.journal.common-event-adapters}
| event-adapter-bindings {
| "${InmemEventAdapterSpec.DomainEventClassName}" = age
| "${InmemEventAdapterSpec.JournalModelClassName}" = age
| }
| }
|
| with-actor-system {
| class = $${akka.persistence.journal.inmem.class}
| dir = "journal-1"
|
| event-adapters {
| logging = "${classOf[InmemEventAdapterSpec].getCanonicalName}$$LoggingAdapter"
| }
| event-adapter-bindings {
| "java.lang.Object" = logging
| }
| }
|
| replay-pass-through-adapter-journal {
| class = $${akka.persistence.journal.inmem.class}
| dir = "journal-2"
|
| event-adapters = $${akka.persistence.journal.common-event-adapters}
| event-adapter-bindings {
| "${InmemEventAdapterSpec.JournalModelClassName}" = replay-pass-through
| "${InmemEventAdapterSpec.DomainEventClassName}" = replay-pass-through
| }
| }
|
| no-adapter {
| class = $${akka.persistence.journal.inmem.class}
| dir = "journal-3"
| }
|}
""".stripMargin))
}
def persister(name: String, journalId: String = journalName) =
system.actorOf(Props(classOf[PersistAllIncomingActor], name, "akka.persistence.journal." + journalId), name)
def toJournal(in: Any, journalId: String = journalName) =
Persistence(system).adaptersFor("akka.persistence.journal." + journalId).get(in.getClass).toJournal(in)
def fromJournal(in: Any, journalId: String = journalName) =
Persistence(system).adaptersFor("akka.persistence.journal." + journalId).get(in.getClass).fromJournal(in, "")
"EventAdapter" must {
"wrap with tags" in {
val event = UserDataChanged("name", 42)
toJournal(event) should equal(Tagged(event, Set("adult")))
}
"unwrap when reading" in {
val event = UserDataChanged("name", 42)
val tagged = Tagged(event, Set("adult"))
toJournal(event) should equal(tagged)
fromJournal(tagged) should equal(SingleEventSeq(event))
}
"create adapter requiring ActorSystem" in {
val event = UserDataChanged("name", 42)
toJournal(event, "with-actor-system") should equal(event)
fromJournal(event, "with-actor-system") should equal(SingleEventSeq(event))
}
"store events after applying adapter" in {
val replayPassThroughJournalId = "replay-pass-through-adapter-journal"
val p1 = persister("p1", journalId = replayPassThroughJournalId)
val m1 = UserDataChanged("name", 64)
val m2 = "hello"
p1 ! m1
p1 ! m2
expectMsg(m1)
expectMsg(m2)
watch(p1)
p1 ! PoisonPill
expectTerminated(p1)
val p11 = persister("p1", journalId = replayPassThroughJournalId)
p11 ! GetState
expectMsg(Tagged(m1, Set("adult")))
expectMsg(m2)
}
"work when plugin defines no adapter" in {
val p2 = persister("p2", journalId = "no-adapter")
val m1 = UserDataChanged("name", 64)
val m2 = "hello"
p2 ! m1
p2 ! m2
expectMsg(m1)
expectMsg(m2)
watch(p2)
p2 ! PoisonPill
expectTerminated(p2)
val p22 = persister("p2", "no-adapter")
p22 ! GetState
expectMsg(m1)
expectMsg(m2)
}
}
}

View file

@ -4,24 +4,19 @@
package akka.persistence package akka.persistence
import akka.actor._ import akka.actor.{ OneForOneStrategy, _ }
import akka.persistence.journal.AsyncWriteProxy import akka.persistence.journal.AsyncWriteProxy
import akka.persistence.journal.AsyncWriteTarget.{ ReplayFailure, ReplayMessages, ReplaySuccess, WriteMessages }
import akka.persistence.journal.inmem.InmemStore import akka.persistence.journal.inmem.InmemStore
import akka.testkit.{ ImplicitSender, AkkaSpec } import akka.testkit.{ ImplicitSender, TestProbe }
import akka.util.Timeout import akka.util.Timeout
import com.typesafe.config.Config
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.persistence.journal.AsyncWriteTarget.{ ReplayFailure, ReplaySuccess, ReplayMessages, WriteMessages }
import scala.language.postfixOps import scala.language.postfixOps
import scala.Some
import akka.actor.OneForOneStrategy
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.testkit.TestProbe
object PersistentActorFailureSpec { object PersistentActorFailureSpec {
import PersistentActorSpec.Cmd import PersistentActorSpec.{ Cmd, Evt, ExamplePersistentActor }
import PersistentActorSpec.Evt
import PersistentActorSpec.ExamplePersistentActor
class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace

View file

@ -0,0 +1,102 @@
/*
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal
import akka.actor.ExtendedActorSystem
import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory
class EventAdaptersSpec extends AkkaSpec {
val config = ConfigFactory.parseString(
s"""
|akka.persistence.journal {
| plugin = "akka.persistence.journal.inmem"
|
|
| # adapters defined for all plugins
| common-event-adapter-bindings {
| }
|
| inmem {
| # showcases re-using and concating configuration of adapters
|
| event-adapters {
| example = ${classOf[ExampleEventAdapter].getCanonicalName}
| marker = ${classOf[MarkerInterfaceAdapter].getCanonicalName}
| precise = ${classOf[PreciseAdapter].getCanonicalName}
| }
| event-adapter-bindings = {
| "${classOf[EventMarkerInterface].getCanonicalName}" = marker
| "java.lang.String" = example
| "akka.persistence.journal.PreciseAdapterEvent" = precise
| }
| }
|}
""".stripMargin).withFallback(ConfigFactory.load())
val extendedActorSystem = system.asInstanceOf[ExtendedActorSystem]
val inmemConfig = config.getConfig("akka.persistence.journal.inmem")
"EventAdapters" must {
"parse configuration and resolve adapter definitions" in {
val adapters = EventAdapters(extendedActorSystem, inmemConfig)
adapters.get(classOf[EventMarkerInterface]).getClass should ===(classOf[MarkerInterfaceAdapter])
}
"pick the most specific adapter available" in {
val adapters = EventAdapters(extendedActorSystem, inmemConfig)
// sanity check; precise case, matching non-user classes
adapters.get(classOf[java.lang.String]).getClass should ===(classOf[ExampleEventAdapter])
// pick adapter by implemented marker interface
adapters.get(classOf[SampleEvent]).getClass should ===(classOf[MarkerInterfaceAdapter])
// more general adapter matches as well, but most specific one should be picked
adapters.get(classOf[PreciseAdapterEvent]).getClass should ===(classOf[PreciseAdapter])
// no adapter defined for Long, should return identity adapter
adapters.get(classOf[java.lang.Long]).getClass should ===(IdentityEventAdapter.getClass)
}
"fail with useful message when binding to not defined adapter" in {
val badConfig = ConfigFactory.parseString(
"""
|akka.persistence.journal.inmem {
| event-adapter-bindings {
| "java.lang.Integer" = undefined-adapter
| }
|}
""".stripMargin)
val combinedConfig = badConfig.getConfig("akka.persistence.journal.inmem")
val ex = intercept[IllegalArgumentException] {
EventAdapters(extendedActorSystem, combinedConfig)
}
ex.getMessage should include("java.lang.Integer was bound to undefined event-adapter: undefined-adapter")
}
}
}
abstract class BaseTestAdapter extends EventAdapter {
override def toJournal(event: Any): Any = event
override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event)
override def manifest(event: Any): String = ""
}
class ExampleEventAdapter extends BaseTestAdapter {
}
class MarkerInterfaceAdapter extends BaseTestAdapter {
}
class PreciseAdapter extends BaseTestAdapter {
}
trait EventMarkerInterface
final case class SampleEvent() extends EventMarkerInterface
final case class PreciseAdapterEvent() extends EventMarkerInterface

View file

@ -62,9 +62,8 @@ object SharedLeveldbJournalSpec {
case m p forward m case m p forward m
} }
override def preStart(): Unit = { override def preStart(): Unit =
context.actorSelection(storePath) ! Identify(1) context.actorSelection(storePath) ! Identify(1)
}
} }
} }

View file

@ -4,20 +4,17 @@
package akka.persistence.serialization package akka.persistence.serialization
import scala.collection.immutable
import com.typesafe.config._
import akka.actor._ import akka.actor._
import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot, UnconfirmedDelivery }
import akka.persistence._ import akka.persistence._
import akka.serialization._ import akka.serialization._
import akka.testkit._ import akka.testkit._
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
import akka.util.ByteString.UTF_8 import akka.util.ByteString.UTF_8
import scala.concurrent.Await import com.typesafe.config._
import scala.concurrent.duration.Duration
import org.apache.commons.codec.binary.Hex.decodeHex import org.apache.commons.codec.binary.Hex.decodeHex
import SerializerSpecConfigs._ import scala.concurrent.Await
import scala.concurrent.duration.Duration
object SerializerSpecConfigs { object SerializerSpecConfigs {
val customSerializers = ConfigFactory.parseString( val customSerializers = ConfigFactory.parseString(
@ -66,7 +63,7 @@ object SerializerSpecConfigs {
} }
import SerializerSpecConfigs._ import akka.persistence.serialization.SerializerSpecConfigs._
class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) { class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
val serialization = SerializationExtension(system) val serialization = SerializationExtension(system)
@ -148,7 +145,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
"A message serializer" when { "A message serializer" when {
"not given a manifest" must { "not given a manifest" must {
"handle custom Persistent message serialization" in { "handle custom Persistent message serialization" in {
val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true) val persistent = PersistentRepr(MyPayload("a"), 13, "p1", "", true)
val serializer = serialization.findSerializerFor(persistent) val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent) val bytes = serializer.toBinary(persistent)
@ -160,7 +157,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
"given a PersistentRepr manifest" must { "given a PersistentRepr manifest" must {
"handle custom Persistent message serialization" in { "handle custom Persistent message serialization" in {
val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true) val persistent = PersistentRepr(MyPayload("b"), 13, "p1", "", true)
val serializer = serialization.findSerializerFor(persistent) val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent) val bytes = serializer.toBinary(persistent)
@ -172,7 +169,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
"given payload serializer with string manifest" must { "given payload serializer with string manifest" must {
"handle serialization" in { "handle serialization" in {
val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", true) val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", "", true)
val serializer = serialization.findSerializerFor(persistent) val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent) val bytes = serializer.toBinary(persistent)
@ -195,7 +192,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
} }
"be able to deserialize data when class is removed" in { "be able to deserialize data when class is removed" in {
val serializer = serialization.findSerializerFor(PersistentRepr("x", 13, "p1", true)) val serializer = serialization.findSerializerFor(PersistentRepr("x", 13, "p1", "", true))
// It was created with: // It was created with:
// val old = PersistentRepr(OldPayload('A'), 13, "p1", true, testActor) // val old = PersistentRepr(OldPayload('A'), 13, "p1", true, testActor)
@ -237,7 +234,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
"31393337" "31393337"
val bytes = decodeHex(oldData.toCharArray) val bytes = decodeHex(oldData.toCharArray)
val expected = PersistentRepr(MyPayload(".a."), 13, "p1", true, Actor.noSender) val expected = PersistentRepr(MyPayload(".a."), 13, "p1", "", true, Actor.noSender)
val serializer = serialization.findSerializerFor(expected) val serializer = serialization.findSerializerFor(expected)
val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[PersistentRepr] val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[PersistentRepr]
deserialized.sender should not be (null) deserialized.sender should not be (null)

View file

@ -24,7 +24,7 @@ object PersistentPublisherSpec {
} }
} }
class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "ViewProducerSpec", serialization = "off")) with PersistenceSpec { class PersistentPublisherSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "ViewProducerSpec", serialization = "off")) {
import PersistentPublisherSpec._ import PersistentPublisherSpec._
val numMessages = 10 val numMessages = 10

View file

@ -36,6 +36,11 @@ object Dependencies {
// TODO remove with metrics from akka-cluster // TODO remove with metrics from akka-cluster
val sigar = "org.fusesource" % "sigar" % "1.6.4" // ApacheV2 val sigar = "org.fusesource" % "sigar" % "1.6.4" // ApacheV2
object Docs {
val sprayJson = "io.spray" %% "spray-json" % "1.3.2" % "test"
val gson = "com.google.code.gson" % "gson" % "2.3.1" % "test"
}
object Test { object Test {
val commonsMath = "org.apache.commons" % "commons-math" % "2.2" % "test" // ApacheV2 val commonsMath = "org.apache.commons" % "commons-math" % "2.2" % "test" // ApacheV2
val commonsIo = "commons-io" % "commons-io" % "2.4" % "test" // ApacheV2 val commonsIo = "commons-io" % "commons-io" % "2.4" % "test" // ApacheV2
@ -111,7 +116,7 @@ object Dependencies {
val osgi = l ++= Seq(osgiCore, osgiCompendium, Test.logback, Test.commonsIo, Test.pojosr, Test.tinybundles, Test.scalatest.value, Test.junit) val osgi = l ++= Seq(osgiCore, osgiCompendium, Test.logback, Test.commonsIo, Test.pojosr, Test.tinybundles, Test.scalatest.value, Test.junit)
val docs = l ++= Seq(Test.scalatest.value, Test.junit, Test.junitIntf) val docs = l ++= Seq(Test.scalatest.value, Test.junit, Test.junitIntf, Docs.sprayJson, Docs.gson)
val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo) val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo)
} }