diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala index 2601508c6c..660b123acf 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala @@ -15,7 +15,7 @@ import akka.actor.ActorRef import akka.actor.ExtendedActorSystem import akka.cluster.sharding.Shard import akka.cluster.sharding.ShardCoordinator -import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages => sm } +import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages ⇒ sm } import akka.serialization.BaseSerializer import akka.serialization.Serialization import akka.serialization.SerializationExtension diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceEventAdapterDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceEventAdapterDocTest.java index b373625b41..896fb3278b 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceEventAdapterDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceEventAdapterDocTest.java @@ -12,7 +12,7 @@ public class PersistenceEventAdapterDocTest { @SuppressWarnings("unused") static //#identity-event-adapter - class MyEventAdapter extends EventAdapter { + class MyEventAdapter implements EventAdapter { @Override public String manifest(Object event) { return ""; // if no manifest needed, return "" diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceSchemaEvolutionDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceSchemaEvolutionDocSpec.scala index 6aa3ff66ac..182d4bf605 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceSchemaEvolutionDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceSchemaEvolutionDocSpec.scala @@ -276,8 +276,9 @@ class UserEventsAdapter extends EventAdapter { override def fromJournal(event: Any, manifest: String): EventSeq = event match { case UserDetailsChanged(null, address) => EventSeq(UserAddressChanged(address)) case UserDetailsChanged(name, null) => EventSeq(UserNameChanged(name)) - case UserDetailsChanged(name, address) => EventSeq(UserNameChanged(name), - UserAddressChanged(address)) + case UserDetailsChanged(name, address) => + EventSeq(UserNameChanged(name), + UserAddressChanged(address)) case event: V2 => EventSeq(event) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapter.scala b/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapter.scala index 0ba707e525..bdbf36457c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapter.scala @@ -8,6 +8,7 @@ import scala.annotation.varargs import scala.collection.immutable /** + * An [[EventAdapter]] is both a [[WriteEventAdapter]] and a [[ReadEventAdapter]]. * Facility to convert from and to specialised data models, as may be required by specialized persistence Journals. * * Typical use cases include (but are not limited to): @@ -17,7 +18,19 @@ import scala.collection.immutable *
  • adapting incoming events in any way before persisting them by the journal
  • * */ -abstract class EventAdapter { +trait EventAdapter extends WriteEventAdapter with ReadEventAdapter + +/** + * Facility to convert to specialised data models, as may be required by specialized persistence Journals. + * + * Typical use cases include (but are not limited to): + * + */ +trait WriteEventAdapter { //#event-adapter-api /** * Return the manifest (type hint) that will be provided in the `fromJournal` method. @@ -39,7 +52,21 @@ abstract class EventAdapter { * @return the adapted event object, possibly the same object if no adaptation was performed */ def toJournal(event: Any): Any + //#event-adapter-api +} +/** + * Facility to convert from and to specialised data models, as may be required by specialized persistence Journals. + * + * Typical use cases include (but are not limited to): + * + */ +trait ReadEventAdapter { + //#event-adapter-api /** * Convert a event from its journal model to the applications domain model. * @@ -54,7 +81,6 @@ abstract class EventAdapter { * @return sequence containing the adapted events (possibly zero) which will be delivered to the PersistentActor */ def fromJournal(event: Any, manifest: String): EventSeq - //#event-adapter-api } @@ -90,3 +116,25 @@ final case object IdentityEventAdapter extends EventAdapter { override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event) override def manifest(event: Any): String = "" } + +/** INTERNAL API */ +private[akka] case class NoopWriteEventAdapter(private val readEventAdapter: ReadEventAdapter) extends EventAdapter { + // pass-through read + override def fromJournal(event: Any, manifest: String): EventSeq = + readEventAdapter.fromJournal(event, manifest) + + // no-op write + override def manifest(event: Any): String = "" + override def toJournal(event: Any): Any = event +} + +/** INTERNAL API */ +private[akka] case class NoopReadEventAdapter(private val writeEventAdapter: WriteEventAdapter) extends EventAdapter { + // pass-through write + override def manifest(event: Any): String = writeEventAdapter.manifest(event) + override def toJournal(event: Any): Any = writeEventAdapter.toJournal(event) + + // no-op read + override def fromJournal(event: Any, manifest: String): EventSeq = + EventSeq(event) +} \ No newline at end of file diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala b/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala index e8b1efcd4f..80628189c3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala @@ -79,7 +79,7 @@ private[akka] object EventAdapters { // A Map of handler from alias to implementation (i.e. class implementing akka.serialization.Serializer) // For example this defines a handler named 'country': `"country" -> com.example.comain.CountryTagsAdapter` - val handlers = for ((k: String, v: String) ← adapters) yield k -> instantiate[EventAdapter](v, system).get + val handlers = for ((k: String, v: String) ← adapters) yield k -> instantiateAdapter(v, system).get // bindings is a Seq of tuple representing the mapping from Class to handler. // It is primarily ordered by the most specific classes first, and secondly in the configured order. @@ -96,6 +96,18 @@ private[akka] object EventAdapters { new EventAdapters(backing, bindings, system.log) } + def instantiateAdapter(adapterFQN: String, system: ExtendedActorSystem): Try[EventAdapter] = { + val clazz = system.dynamicAccess.getClassFor[Any](adapterFQN).get + if (classOf[EventAdapter] isAssignableFrom clazz) + instantiate[EventAdapter](adapterFQN, system) + else if (classOf[WriteEventAdapter] isAssignableFrom clazz) + instantiate[WriteEventAdapter](adapterFQN, system).map(NoopReadEventAdapter) + else if (classOf[ReadEventAdapter] isAssignableFrom clazz) + instantiate[ReadEventAdapter](adapterFQN, system).map(NoopWriteEventAdapter) + else + throw new IllegalArgumentException(s"Configured $adapterFQN does not implement any EventAdapter interface!") + } + /** INTERNAL API */ private[akka] case class CombinedReadEventAdapter(adapters: immutable.Seq[EventAdapter]) extends EventAdapter { private def onlyReadSideException = new IllegalStateException("CombinedReadEventAdapter must not be used when writing (creating manifests) events!") diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/EventAdaptersSpec.scala b/akka-persistence/src/test/scala/akka/persistence/journal/EventAdaptersSpec.scala index 64094caea3..814357791c 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/EventAdaptersSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/EventAdaptersSpec.scala @@ -27,11 +27,15 @@ class EventAdaptersSpec extends AkkaSpec { | example = ${classOf[ExampleEventAdapter].getCanonicalName} | marker = ${classOf[MarkerInterfaceAdapter].getCanonicalName} | precise = ${classOf[PreciseAdapter].getCanonicalName} + | reader = ${classOf[ReaderAdapter].getCanonicalName} + | writer = ${classOf[WriterAdapter].getCanonicalName} | } | event-adapter-bindings = { | "${classOf[EventMarkerInterface].getCanonicalName}" = marker | "java.lang.String" = example | "akka.persistence.journal.PreciseAdapterEvent" = precise + | "akka.persistence.journal.ReadMeEvent" = reader + | "akka.persistence.journal.WriteMeEvent" = writer | } | } |} @@ -79,6 +83,22 @@ class EventAdaptersSpec extends AkkaSpec { ex.getMessage should include("java.lang.Integer was bound to undefined event-adapter: undefined-adapter") } + + "allow implementing only the read-side (ReadEventAdapter)" in { + val adapters = EventAdapters(extendedActorSystem, inmemConfig) + + // read-side only adapter + val r: EventAdapter = adapters.get(classOf[ReadMeEvent]) + r.fromJournal(r.toJournal(ReadMeEvent()), "").events.head.toString should ===("from-ReadMeEvent()") + } + + "allow implementing only the write-side (WriteEventAdapter)" in { + val adapters = EventAdapters(extendedActorSystem, inmemConfig) + + // write-side only adapter + val w: EventAdapter = adapters.get(classOf[WriteMeEvent]) + w.fromJournal(w.toJournal(WriteMeEvent()), "").events.head.toString should ===("to-WriteMeEvent()") + } } } @@ -96,6 +116,18 @@ class MarkerInterfaceAdapter extends BaseTestAdapter { class PreciseAdapter extends BaseTestAdapter { } +case class ReadMeEvent() +class ReaderAdapter extends ReadEventAdapter { + override def fromJournal(event: Any, manifest: String): EventSeq = + EventSeq("from-" + event) +} + +case class WriteMeEvent() +class WriterAdapter extends WriteEventAdapter { + override def manifest(event: Any): String = "" + override def toJournal(event: Any): Any = "to-" + event +} + trait EventMarkerInterface final case class SampleEvent() extends EventMarkerInterface final case class PreciseAdapterEvent() extends EventMarkerInterface