+per #18137 EventAdapter => can be Read / Write or both

This commit is contained in:
Konrad Malawski 2015-08-19 13:58:29 +02:00
parent 3e2063bac2
commit 4e59f0ea35
6 changed files with 100 additions and 7 deletions

View file

@ -15,7 +15,7 @@ import akka.actor.ActorRef
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.cluster.sharding.Shard import akka.cluster.sharding.Shard
import akka.cluster.sharding.ShardCoordinator import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages => sm } import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages sm }
import akka.serialization.BaseSerializer import akka.serialization.BaseSerializer
import akka.serialization.Serialization import akka.serialization.Serialization
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension

View file

@ -12,7 +12,7 @@ public class PersistenceEventAdapterDocTest {
@SuppressWarnings("unused") @SuppressWarnings("unused")
static static
//#identity-event-adapter //#identity-event-adapter
class MyEventAdapter extends EventAdapter { class MyEventAdapter implements EventAdapter {
@Override @Override
public String manifest(Object event) { public String manifest(Object event) {
return ""; // if no manifest needed, return "" return ""; // if no manifest needed, return ""

View file

@ -276,8 +276,9 @@ class UserEventsAdapter extends EventAdapter {
override def fromJournal(event: Any, manifest: String): EventSeq = event match { override def fromJournal(event: Any, manifest: String): EventSeq = event match {
case UserDetailsChanged(null, address) => EventSeq(UserAddressChanged(address)) case UserDetailsChanged(null, address) => EventSeq(UserAddressChanged(address))
case UserDetailsChanged(name, null) => EventSeq(UserNameChanged(name)) case UserDetailsChanged(name, null) => EventSeq(UserNameChanged(name))
case UserDetailsChanged(name, address) => EventSeq(UserNameChanged(name), case UserDetailsChanged(name, address) =>
UserAddressChanged(address)) EventSeq(UserNameChanged(name),
UserAddressChanged(address))
case event: V2 => EventSeq(event) case event: V2 => EventSeq(event)
} }

View file

@ -8,6 +8,7 @@ import scala.annotation.varargs
import scala.collection.immutable import scala.collection.immutable
/** /**
* An [[EventAdapter]] is both a [[WriteEventAdapter]] and a [[ReadEventAdapter]].
* Facility to convert from and to specialised data models, as may be required by specialized persistence Journals. * Facility to convert from and to specialised data models, as may be required by specialized persistence Journals.
* *
* Typical use cases include (but are not limited to): * Typical use cases include (but are not limited to):
@ -17,7 +18,19 @@ import scala.collection.immutable
* <li>adapting incoming events in any way before persisting them by the journal</li> * <li>adapting incoming events in any way before persisting them by the journal</li>
* </ul> * </ul>
*/ */
abstract class EventAdapter { trait EventAdapter extends WriteEventAdapter with ReadEventAdapter
/**
* Facility to convert to specialised data models, as may be required by specialized persistence Journals.
*
* Typical use cases include (but are not limited to):
* <ul>
* <li>adding metadata, a.k.a. "tagging" - by wrapping objects into tagged counterparts</li>
* <li>manually converting to the Journals storage format, such as JSON, BSON or any specialised binary format</li>
* <li>splitting up large events into sequences of smaller ones</li>
* </ul>
*/
trait WriteEventAdapter {
//#event-adapter-api //#event-adapter-api
/** /**
* Return the manifest (type hint) that will be provided in the `fromJournal` method. * Return the manifest (type hint) that will be provided in the `fromJournal` method.
@ -39,7 +52,21 @@ abstract class EventAdapter {
* @return the adapted event object, possibly the same object if no adaptation was performed * @return the adapted event object, possibly the same object if no adaptation was performed
*/ */
def toJournal(event: Any): Any def toJournal(event: Any): Any
//#event-adapter-api
}
/**
* Facility to convert from and to specialised data models, as may be required by specialized persistence Journals.
*
* Typical use cases include (but are not limited to):
* <ul>
* <li>extracting events from "envelopes"</li>
* <li>manually converting to the Journals storage format, such as JSON, BSON or any specialised binary format</li>
* <li>adapting incoming events from a "data model" to the "domain model"</li>
* </ul>
*/
trait ReadEventAdapter {
//#event-adapter-api
/** /**
* Convert a event from its journal model to the applications domain model. * Convert a event from its journal model to the applications domain model.
* *
@ -54,7 +81,6 @@ abstract class EventAdapter {
* @return sequence containing the adapted events (possibly zero) which will be delivered to the PersistentActor * @return sequence containing the adapted events (possibly zero) which will be delivered to the PersistentActor
*/ */
def fromJournal(event: Any, manifest: String): EventSeq def fromJournal(event: Any, manifest: String): EventSeq
//#event-adapter-api //#event-adapter-api
} }
@ -90,3 +116,25 @@ final case object IdentityEventAdapter extends EventAdapter {
override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event) override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event)
override def manifest(event: Any): String = "" override def manifest(event: Any): String = ""
} }
/** INTERNAL API */
private[akka] case class NoopWriteEventAdapter(private val readEventAdapter: ReadEventAdapter) extends EventAdapter {
// pass-through read
override def fromJournal(event: Any, manifest: String): EventSeq =
readEventAdapter.fromJournal(event, manifest)
// no-op write
override def manifest(event: Any): String = ""
override def toJournal(event: Any): Any = event
}
/** INTERNAL API */
private[akka] case class NoopReadEventAdapter(private val writeEventAdapter: WriteEventAdapter) extends EventAdapter {
// pass-through write
override def manifest(event: Any): String = writeEventAdapter.manifest(event)
override def toJournal(event: Any): Any = writeEventAdapter.toJournal(event)
// no-op read
override def fromJournal(event: Any, manifest: String): EventSeq =
EventSeq(event)
}

View file

@ -79,7 +79,7 @@ private[akka] object EventAdapters {
// A Map of handler from alias to implementation (i.e. class implementing akka.serialization.Serializer) // A Map of handler from alias to implementation (i.e. class implementing akka.serialization.Serializer)
// For example this defines a handler named 'country': `"country" -> com.example.comain.CountryTagsAdapter` // For example this defines a handler named 'country': `"country" -> com.example.comain.CountryTagsAdapter`
val handlers = for ((k: String, v: String) adapters) yield k -> instantiate[EventAdapter](v, system).get val handlers = for ((k: String, v: String) adapters) yield k -> instantiateAdapter(v, system).get
// bindings is a Seq of tuple representing the mapping from Class to handler. // bindings is a Seq of tuple representing the mapping from Class to handler.
// It is primarily ordered by the most specific classes first, and secondly in the configured order. // It is primarily ordered by the most specific classes first, and secondly in the configured order.
@ -96,6 +96,18 @@ private[akka] object EventAdapters {
new EventAdapters(backing, bindings, system.log) new EventAdapters(backing, bindings, system.log)
} }
def instantiateAdapter(adapterFQN: String, system: ExtendedActorSystem): Try[EventAdapter] = {
val clazz = system.dynamicAccess.getClassFor[Any](adapterFQN).get
if (classOf[EventAdapter] isAssignableFrom clazz)
instantiate[EventAdapter](adapterFQN, system)
else if (classOf[WriteEventAdapter] isAssignableFrom clazz)
instantiate[WriteEventAdapter](adapterFQN, system).map(NoopReadEventAdapter)
else if (classOf[ReadEventAdapter] isAssignableFrom clazz)
instantiate[ReadEventAdapter](adapterFQN, system).map(NoopWriteEventAdapter)
else
throw new IllegalArgumentException(s"Configured $adapterFQN does not implement any EventAdapter interface!")
}
/** INTERNAL API */ /** INTERNAL API */
private[akka] case class CombinedReadEventAdapter(adapters: immutable.Seq[EventAdapter]) extends EventAdapter { private[akka] case class CombinedReadEventAdapter(adapters: immutable.Seq[EventAdapter]) extends EventAdapter {
private def onlyReadSideException = new IllegalStateException("CombinedReadEventAdapter must not be used when writing (creating manifests) events!") private def onlyReadSideException = new IllegalStateException("CombinedReadEventAdapter must not be used when writing (creating manifests) events!")

View file

@ -27,11 +27,15 @@ class EventAdaptersSpec extends AkkaSpec {
| example = ${classOf[ExampleEventAdapter].getCanonicalName} | example = ${classOf[ExampleEventAdapter].getCanonicalName}
| marker = ${classOf[MarkerInterfaceAdapter].getCanonicalName} | marker = ${classOf[MarkerInterfaceAdapter].getCanonicalName}
| precise = ${classOf[PreciseAdapter].getCanonicalName} | precise = ${classOf[PreciseAdapter].getCanonicalName}
| reader = ${classOf[ReaderAdapter].getCanonicalName}
| writer = ${classOf[WriterAdapter].getCanonicalName}
| } | }
| event-adapter-bindings = { | event-adapter-bindings = {
| "${classOf[EventMarkerInterface].getCanonicalName}" = marker | "${classOf[EventMarkerInterface].getCanonicalName}" = marker
| "java.lang.String" = example | "java.lang.String" = example
| "akka.persistence.journal.PreciseAdapterEvent" = precise | "akka.persistence.journal.PreciseAdapterEvent" = precise
| "akka.persistence.journal.ReadMeEvent" = reader
| "akka.persistence.journal.WriteMeEvent" = writer
| } | }
| } | }
|} |}
@ -79,6 +83,22 @@ class EventAdaptersSpec extends AkkaSpec {
ex.getMessage should include("java.lang.Integer was bound to undefined event-adapter: undefined-adapter") ex.getMessage should include("java.lang.Integer was bound to undefined event-adapter: undefined-adapter")
} }
"allow implementing only the read-side (ReadEventAdapter)" in {
val adapters = EventAdapters(extendedActorSystem, inmemConfig)
// read-side only adapter
val r: EventAdapter = adapters.get(classOf[ReadMeEvent])
r.fromJournal(r.toJournal(ReadMeEvent()), "").events.head.toString should ===("from-ReadMeEvent()")
}
"allow implementing only the write-side (WriteEventAdapter)" in {
val adapters = EventAdapters(extendedActorSystem, inmemConfig)
// write-side only adapter
val w: EventAdapter = adapters.get(classOf[WriteMeEvent])
w.fromJournal(w.toJournal(WriteMeEvent()), "").events.head.toString should ===("to-WriteMeEvent()")
}
} }
} }
@ -96,6 +116,18 @@ class MarkerInterfaceAdapter extends BaseTestAdapter {
class PreciseAdapter extends BaseTestAdapter { class PreciseAdapter extends BaseTestAdapter {
} }
case class ReadMeEvent()
class ReaderAdapter extends ReadEventAdapter {
override def fromJournal(event: Any, manifest: String): EventSeq =
EventSeq("from-" + event)
}
case class WriteMeEvent()
class WriterAdapter extends WriteEventAdapter {
override def manifest(event: Any): String = ""
override def toJournal(event: Any): Any = "to-" + event
}
trait EventMarkerInterface trait EventMarkerInterface
final case class SampleEvent() extends EventMarkerInterface final case class SampleEvent() extends EventMarkerInterface
final case class PreciseAdapterEvent() extends EventMarkerInterface final case class PreciseAdapterEvent() extends EventMarkerInterface