diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index bdef9e2353..11058a876f 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -79,7 +79,7 @@ Architecture * *AbstractPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. -* *Journal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages +* *AsyncWriteJournal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages are journaled and which are received by the persistent actor without being journaled. The storage backend of a journal is pluggable. Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem, and replicated journals are available as `Community plugins`_. diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 5a1832f485..71a2f8b2ce 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -83,7 +83,7 @@ Architecture * *UntypedPersistentActorAtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. -* *Journal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages +* *AsyncWriteJournal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages are journaled and which are received by the persistent actor without being journaled. The storage backend of a journal is pluggable. Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem, and replicated journals are available as `Community plugins`_. diff --git a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala new file mode 100644 index 0000000000..ac8dab1925 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package docs.persistence.query + +import akka.actor.Props +import akka.persistence.PersistentRepr +import akka.persistence.query.EventEnvelope +import akka.serialization.SerializationExtension +import akka.stream.actor.ActorPublisher +import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request } + +import scala.annotation.tailrec +import scala.concurrent.duration.FiniteDuration + +object MyEventsByTagPublisher { + def props(tag: String, offset: Long, refreshInterval: FiniteDuration): Props = + Props(new MyEventsByTagPublisher(tag, offset, refreshInterval)) +} + +//#events-by-tag-publisher +class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteDuration) + extends ActorPublisher[EventEnvelope] { + import MyEventsByTagPublisher._ + + private case object Continue + + private val limit = 1000 + + private var currentId = 0L + var buf = Vector.empty[EventEnvelope] + + import context.dispatcher + val continueTask = context.system.scheduler.schedule( + refreshInterval, refreshInterval, self, Continue) + + override def postStop(): Unit = { + continueTask.cancel() + } + + def receive = { + case _: Request | Continue ⇒ + query() + deliverBuf() + + case Cancel ⇒ + context.stop(self) + } + + def query(): Unit = + if (buf.isEmpty) { + try { + // Could be an SQL query, for example: + // "SELECT id, persistent_repr FROM journal WHERE tag = like ? and " + + // "id >= ? ORDER BY id limit ?" + val result: Vector[(Long, Array[Byte])] = ??? + currentId = if (result.nonEmpty) result.last._1 else currentId + val serialization = SerializationExtension(context.system) + + buf = result.map { + case (id, bytes) ⇒ + val p = serialization.deserialize(bytes, classOf[PersistentRepr]).get + EventEnvelope(offset = id, p.persistenceId, p.sequenceNr, p.payload) + } + } catch { + case e: Exception ⇒ + onErrorThenStop(e) + } + } + + @tailrec final def deliverBuf(): Unit = + if (totalDemand > 0 && buf.nonEmpty) { + if (totalDemand <= Int.MaxValue) { + val (use, keep) = buf.splitAt(totalDemand.toInt) + buf = keep + use foreach onNext + } else { + val (use, keep) = buf.splitAt(Int.MaxValue) + buf = keep + use foreach onNext + deliverBuf() + } + } +} +//#events-by-tag-publisher \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala new file mode 100644 index 0000000000..0c4dc391b6 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -0,0 +1,260 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ + +package docs.persistence.query + +import akka.actor._ +import akka.persistence.query.scaladsl.ReadJournal +import akka.persistence.{ Recovery, PersistentActor } +import akka.persistence.query._ +import akka.stream.{ FlowShape, ActorMaterializer } +import akka.stream.scaladsl.FlowGraph +import akka.stream.scaladsl.{ Flow, Sink, Source } +import akka.testkit.AkkaSpec +import akka.util.Timeout +import docs.persistence.query.PersistenceQueryDocSpec.{ DummyStore, TheOneWhoWritesToQueryJournal } +import org.reactivestreams.Subscriber + +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ + +object PersistenceQueryDocSpec { + + implicit val timeout = Timeout(3.seconds) + + //#my-read-journal + class MyReadJournal(system: ExtendedActorSystem) extends ReadJournal { + + // TODO from config + private val defaulRefreshInterval: FiniteDuration = 3.seconds + + override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + q match { + case EventsByTag(tag, offset) ⇒ + val props = MyEventsByTagPublisher.props(tag, offset, refreshInterval(hints)) + Source.actorPublisher[EventEnvelope](props) + .mapMaterializedValue(_ ⇒ noMaterializedValue) + + case unsupported ⇒ + Source.failed[T]( + new UnsupportedOperationException( + s"Query $unsupported not supported by ${getClass.getName}")) + .mapMaterializedValue(_ ⇒ noMaterializedValue) + } + + private def refreshInterval(hints: Seq[Hint]): FiniteDuration = + hints.collectFirst { case RefreshInterval(interval) ⇒ interval } + .getOrElse(defaulRefreshInterval) + + private def noMaterializedValue[M]: M = + null.asInstanceOf[M] + } + + //#my-read-journal + case class ComplexState() { + def readyToSave = false + } + case class Record(any: Any) + class DummyStore { def save(record: Record) = Future.successful(42L) } + + class X { + val JournalId = "" + + def convertToReadSideTypes(in: Any): Any = ??? + + object ReactiveStreamsCompatibleDBDriver { + def batchWriter: Subscriber[immutable.Seq[Any]] = ??? + } + + //#projection-into-different-store-rs + implicit val system = ActorSystem() + implicit val mat = ActorMaterializer() + + val readJournal = PersistenceQuery(system).readJournalFor(JournalId) + val dbBatchWriter: Subscriber[immutable.Seq[Any]] = + ReactiveStreamsCompatibleDBDriver.batchWriter + + // Using an example (Reactive Streams) Database driver + readJournal + .query(EventsByPersistenceId("user-1337")) + .map(convertToReadSideTypes) // convert to datatype + .grouped(20) // batch inserts into groups of 20 + .runWith(Sink(dbBatchWriter)) // write batches to read-side database + //#projection-into-different-store-rs + } + + //#projection-into-different-store-actor + class TheOneWhoWritesToQueryJournal(id: String) extends Actor { + val store = new DummyStore() + + var state: ComplexState = ComplexState() + + def receive = { + case m => + state = updateState(state, m) + if (state.readyToSave) store.save(Record(state)) + } + + def updateState(state: ComplexState, msg: Any): ComplexState = { + // some complicated aggregation logic here ... + state + } + } + + //#projection-into-different-store-actor + +} + +class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { + + def this() { + this( + """ + akka.persistence.query.noop-read-journal { + class = "docs.persistence.query.NoopReadJournal" + } + """.stripMargin) + } + + //#basic-usage + // obtain read journal by plugin id + val readJournal = + PersistenceQuery(system).readJournalFor("akka.persistence.query.noop-read-journal") + + // issue query to journal + val source: Source[Any, Unit] = + readJournal.query(EventsByPersistenceId("user-1337", 0, Long.MaxValue)) + + // materialize stream, consuming events + implicit val mat = ActorMaterializer() + source.runForeach { event => println("Event: " + event) } + //#basic-usage + + //#all-persistence-ids-live + readJournal.query(AllPersistenceIds) + //#all-persistence-ids-live + + //#all-persistence-ids-snap + readJournal.query(AllPersistenceIds, hints = NoRefresh) + //#all-persistence-ids-snap + + //#events-by-tag + // assuming journal is able to work with numeric offsets we can: + + val blueThings: Source[EventEnvelope, Unit] = + readJournal.query(EventsByTag("blue")) + + // find top 10 blue things: + val top10BlueThings: Future[Vector[Any]] = + blueThings + .map(_.event) + .take(10) // cancels the query stream after pulling 10 elements + .runFold(Vector.empty[Any])(_ :+ _) + + // start another query, from the known offset + val furtherBlueThings = readJournal.query(EventsByTag("blue", offset = 10)) + //#events-by-tag + + //#events-by-persistent-id-refresh + readJournal.query(EventsByPersistenceId("user-us-1337"), hints = RefreshInterval(1.second)) + + //#events-by-persistent-id-refresh + + //#advanced-journal-query-definition + final case class RichEvent(tags: immutable.Set[String], payload: Any) + + case class QueryStats(totalEvents: Long) + + case class ByTagsWithStats(tags: immutable.Set[String]) + extends Query[RichEvent, QueryStats] + + //#advanced-journal-query-definition + + //#advanced-journal-query-hints + + import scala.concurrent.duration._ + + readJournal.query(EventsByTag("blue"), hints = RefreshInterval(1.second)) + //#advanced-journal-query-hints + + //#advanced-journal-query-usage + val query: Source[RichEvent, QueryStats] = + readJournal.query(ByTagsWithStats(Set("red", "blue"))) + + query + .mapMaterializedValue { stats => println(s"Stats: $stats") } + .map { event => println(s"Event payload: ${event.payload}") } + .runWith(Sink.ignore) + + //#advanced-journal-query-usage + + //#materialized-query-metadata + // a plugin can provide: + case class QueryMetadata(deterministicOrder: Boolean, infinite: Boolean) + + case object AllEvents extends Query[Any, QueryMetadata] + + val events = readJournal.query(AllEvents) + events + .mapMaterializedValue { meta => + println(s"The query is: " + + s"ordered deterministically: ${meta.deterministicOrder}, " + + s"infinite: ${meta.infinite}") + } + + //#materialized-query-metadata + + //#projection-into-different-store + class MyResumableProjection(name: String) { + def saveProgress(offset: Long): Future[Long] = ??? + def latestOffset: Future[Long] = ??? + } + //#projection-into-different-store + + class RunWithActor { + //#projection-into-different-store-actor-run + import akka.pattern.ask + import system.dispatcher + implicit val timeout = Timeout(3.seconds) + + val bidProjection = new MyResumableProjection("bid") + + val writerProps = Props(classOf[TheOneWhoWritesToQueryJournal], "bid") + val writer = system.actorOf(writerProps, "bid-projection-writer") + + bidProjection.latestOffset.foreach { startFromOffset => + readJournal + .query(EventsByTag("bid", startFromOffset)) + .mapAsync(8) { envelope => (writer ? envelope.event).map(_ => envelope.offset) } + .mapAsync(1) { offset => bidProjection.saveProgress(offset) } + .runWith(Sink.ignore) + } + //#projection-into-different-store-actor-run + } + + class RunWithAsyncFunction { + //#projection-into-different-store-simple + trait ExampleStore { + def save(event: Any): Future[Unit] + } + //#projection-into-different-store-simple + + //#projection-into-different-store-simple + val store: ExampleStore = ??? + + readJournal + .query(EventsByTag("bid")) + .mapAsync(1) { e => store.save(e) } + .runWith(Sink.ignore) + //#projection-into-different-store-simple + } + +} + +class NoopReadJournal(sys: ExtendedActorSystem) extends ReadJournal { + override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + Source.empty.mapMaterializedValue(_ => null.asInstanceOf[M]) +} diff --git a/akka-docs/rst/scala/index-actors.rst b/akka-docs/rst/scala/index-actors.rst index 0e77f0c225..ca09504ab7 100644 --- a/akka-docs/rst/scala/index-actors.rst +++ b/akka-docs/rst/scala/index-actors.rst @@ -12,6 +12,7 @@ Actors routing fsm persistence + persistence-query testing actordsl typed-actors diff --git a/akka-docs/rst/scala/persistence-query.rst b/akka-docs/rst/scala/persistence-query.rst new file mode 100644 index 0000000000..15435ba1e4 --- /dev/null +++ b/akka-docs/rst/scala/persistence-query.rst @@ -0,0 +1,251 @@ +.. _persistence-query-scala: + +################# +Persistence Query +################# + +Akka persistence query complements :ref:`persistence-scala` by providing a universal asynchronous stream based +query interface that various journal plugins can implement in order to expose their query capabilities. + +The most typical use case of persistence query is implementing the so-called query side (also known as "read side") +in the popular CQRS architecture pattern - in which the writing side of the application (e.g. implemented using akka +persistence) is completely separated from the "query side". Akka Persistence Query itself is *not* directly the query +side of an application, however it can help to migrate data from the write side to the query side database. In very +simple scenarios Persistence Query may be powerful enough to fulful the query needs of your app, however we highly +recommend (in the spirit of CQRS) of splitting up the write/read sides into separate datastores as the need arrises. + +While queries can be performed directly on the same datastore, it is also a very common pattern to use the queries +to create *projections* of the write-side's events and store them into a separate datastore which is optimised for more +complex queries. This architectural pattern of projecting the data into a query optimised datastore, with possibly some +transformation or canculations along the way is the core use-case and recommended style of using Akka Persistence Query +- pulling out of one Journal and storing into another one. + +.. warning:: + + This module is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to + improve this API based on our users’ feedback, which implies that while we try to keep incompatible + changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the + contents of the ``akka.persistence.query`` package. + +Dependencies +============ + +Akka persistence query is a separate jar file. Make sure that you have the following dependency in your project:: + + "com.typesafe.akka" %% "akka-persistence-query-experimental" % "@version@" @crossString@ + +Design overview +=============== + +Akka persistence query is purposely designed to be a very loosely specified API. +This is in order to keep the provided APIs general enough for each journal implementation to be able to expose its best +features, e.g. a SQL journal can use complex SQL queries or if a journal is able to subscribe to a live event stream +this should also be possible to expose the same API - a typed stream of events. + +**Each read journal must explicitly document which types of queries it supports.** +Refer to the your journal's plugins documentation for details on which queries and semantics it supports. + +While Akka Persistence Query does not provide actual implementations of ReadJournals, it defines a number of pre-defined +query types for the most common query scenarios, that most journals are likely to implement (however they are not required to). + +Read Journals +============= + +In order to issue queries one has to first obtain an instance of a ``ReadJournal``. +Read journals are implemented as `Community plugins`_, each targeting a specific datastore (for example Cassandra or JDBC +databases). For example, given a library that provides a ``akka.persistence.query.noop-read-journal`` obtaining the related +journal is as simple as: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#basic-usage + +Journal implementers are encouraged to put this identified in a variable known to the user, such that one can access it via +``journalFor(NoopJournal.identifier)``, however this is not enforced. + +Read journal implementations are available as `Community plugins`_. + + +Predefined queries +------------------ +Akka persistence query comes with a number of ``Query`` objects built in and suggests Journal implementors to implement +them according to the semantics described below. It is important to notice that while these query types are very common +a journal is not obliged to implement all of them - for example because in a given journal such query would be +significantly inefficient. + +.. note:: + Refer to the documentation of the ``ReadJournal`` plugin you are using for a specific list of supported query types. + For example, Journal plugins should document their stream completion strategies. + +The predefined queries are: + +``AllPersistenceIds`` which is designed to allow users to subscribe to a stream of all persistent ids in the system. +By default this stream should be assumed to be a "live" stream, which means that the journal should keep emitting new +persistence ids as they come into the system: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#all-persistence-ids-live + +If your usage does not require a live stream, you can disable refreshing by using *hints*, providing the built-in +``NoRefresh`` hint to the query: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#all-persistence-ids-snap + +``EventsByPersistenceId`` is a query equivalent to replaying a :ref:`PersistentActor `, +however, since it is a stream it is possible to keep it alive and watch for additional incoming events persisted by the +persistent actor identified by the given ``persistenceId``. Most journal will have to revert to polling in order to achieve +this, which can be configured using the ``RefreshInterval`` query hint: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-persistent-id-refresh + +``EventsByTag`` allows querying events regardles of which ``persistenceId`` they are associated with. This query is hard to +implement in some journals or may need some additional preparation of the used data store to be executed efficiently, +please refer to your read journal plugin's documentation to find out if and how it is supported. The goal of this query +is to allow querying for all events which are "tagged" with a specific tag - again, how exactly this is implemented +depends on the used journal. + +.. note:: + A very important thing to keep in mind when using queries spanning multiple persistenceIds, such as ``EventsByTag`` + is that the order of events at which the events appear in the stream rarely is guaranteed (or stable between materializations). + + Journals *may* choose to opt for strict ordering of the events, and should then document explicitly what kind of ordering + guarantee they provide - for example "*ordered by timestamp ascending, independently of persistenceId*" is easy to achieve + on relational databases, yet may be hard to implement efficiently on plain key-value datastores. + +In the example below we query all events which have been tagged (we assume this was performed by the write-side using an +:ref:`EventAdapter `, or that the journal is smart enough that it can figure out what we mean by this +tag - for example if the journal stored the events as json it may try to find those with the field ``tag`` set to this value etc.). + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#events-by-tag + +As you can see, we can use all the usual stream combinators available from `Akka Streams`_ on the resulting query stream, +including for example taking the first 10 and cancelling the stream. It is worth pointing out that the built-in ``EventsByTag`` +query has an optionally supported offset parameter (of type ``Long``) which the journals can use to implement resumable-streams. +For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore +that is able to order events by insertion time it could treat the Long as a timestamp and select only older events. +Again, specific capabilities are specific to the journal you are using, so you have to + + +Materialized values of queries +------------------------------ +Journals are able to provide additional information related to a query by exposing `materialized values`_, +which are a feature of `Akka Streams`_ that allows to expose additional values at stream materialization time. + +More advanced query journals may use this technique to expose information about the character of the materialized +stream, for example if it's finite or infinite, strictly ordered or not ordered at all. The materialized value type +is defined as the ``M`` type parameter of a query (``Query[T,M]``), which allows journals to provide users with their +specialised query object, as demonstrated in the sample below: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#materialized-query-metadata + +.. _materialized values: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-quickstart.html#Materialized_values +.. _Akka Streams: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala.html +.. _Community plugins: http://akka.io/community/#plugins-to-akka-persistence-query + +Performance and denormalization +=============================== +When building systems using :ref:`event-sourcing` and CQRS (`Command & Query Responsibility Segragation`_) techniques +it is tremendously important to realise that the write-side has completely different needs from the read-side, +and separating those concerns into datastores that are optimised for either side makes it possible to offer the best +expirience for the write and read sides independently. + +For example, in a bidding system it is important to "take the write" and respond to the bidder that we have accepted +the bid as soon as possible, which means that write-throughput is of highest importance for the write-side – often this +means that data stores which are able to scale to accomodate these requirements have a less expressive query side. + +On the other hand the same application may have some complex statistics view or we may have analists working with the data +to figure out best bidding strategies and trends – this often requires some kind of expressive query capabilities like +for example SQL or writing Spark jobs to analyse the data. Trefore the data stored in the write-side needs to be +projected into the other read-optimised datastore. + +.. note:: + When refering to **Materialized Views** in Akka Persistence think of it as "some persistent storage of the result of a Query". + In other words, it means that the view is created once, in order to be afterwards queries multiple times, as in this format + it may be more efficient or interesting to query it (instead of the source events directly). + +Materialize view to Reactive Streams compatible datastore +--------------------------------------------------------- + +If the read datastore exposes it an `Reactive Streams`_ interface then implementing a simple projection +is as simple as, using the read-journal and feeding it into the databases driver interface, for example like so: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-rs + +Materialize view using mapAsync +------------------------------- + +If the target database does not provide a reactive streams ``Subscriber`` that can perform writes, +you may have to implement the write logic using plain functions or Actors instead. + +In case your write logic is state-less and you just need to convert the events from one data data type to another +before writing into the alternative datastore, then the projection is as simple as: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-simple + +Resumable projections +--------------------- + +Sometimes you may need to implement "resumable" projections, that will not start from the beginning of time each time +when run. In this case you will need to store the sequence number (or ``offset``) of the processed event and use it +the next time this projection is started. This pattern is not built-in, however is rather simple to implement yourself. + +The example below additionally highlights how you would use Actors to implement the write side, in case +you need to do some complex logic that would be best handled inside an Actor before persisting the event +into the other datastore: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-actor-run + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#projection-into-different-store-actor + +.. _Command & Query Responsibility Segragation: https://msdn.microsoft.com/en-us/library/jj554200.aspx + +.. _read-journal-plugin-api-scala: + +Query plugins +============= + +Query plugins are various (mostly community driven) :class:`ReadJournal` implementations for all kinds +of available datastores. The complete list of available plugins is maintained on the Akka Persistence Query `Community Plugins`_ page. + +This section aims to provide tips and guide plugin developers through implementing a custom query plugin. +Most users will not need to implement journals themselves, except if targeting a not yet supported datastore. + +.. note:: + Since different data stores provide different query capabilities journal plugins **must extensively document** + their exposed semantics as well as handled query scenarios. + +ReadJournal plugin API +---------------------- + +Journals *MUST* return a *failed* ``Source`` if they are unable to execute the passed in query. +For example if the user accidentally passed in an ``SqlQuery()`` to a key-value journal. + +Below is a simple journal implementation: + +.. includecode:: code/docs/persistence/query/PersistenceQueryDocSpec.scala#my-read-journal + +And the ``EventsByTag`` could be backed by such an Actor for example: + +.. includecode:: code/docs/persistence/query/MyEventsByTagPublisher.scala#events-by-tag-publisher + +More journal example implementations +------------------------------------ + +In order to help implementers get get started with implementing read journals a number of reference implementaions +have been prepared, each highlighting a specific style a journal might need to be implemented in: + +* TODO LINK HERE – when the backing data store is unable to push events, nor does it expose an reactive streams interface, + yet has rich query capabilities (like an SQL database for example) +* TODO LINK HERE – when a `Reactive Streams`_ adapter or driver is available for the datastore, yet it is not able to handle + polling by itself. For example when using `Slick 3`_ along side with a typical SQL database. +* TODO LINK HERE – when the backing datastore already has a fully "reactive push/pull" adapter implemented, for example + such exist for Kafka (see the `Reactive Kafka`_ project by Krzysztof Ciesielski for details). + +.. _Reactive Kafka: https://github.com/softwaremill/reactive-kafka +.. _Reactive Streams: http://reactive-streams.org +.. _Slick 3: http://slick.typesafe.com/ + + +Plugin TCK +---------- + +TODO, not available yet. + + diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 2c28f9092a..0289298ec2 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -68,7 +68,7 @@ Architecture * *AtLeastOnceDelivery*: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes. -* *Journal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages +* *AsyncWriteJournal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages are journaled and which are received by the persistent actor without being journaled. The storage backend of a journal is pluggable. Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem, and replicated journals are available as `Community plugins`_. @@ -638,6 +638,7 @@ Event Adapters help in situations where: understand JSON it is possible to write an EventAdapter ``toJournal:Any=>JSON`` such that the Journal can *directly* store the json instead of serializing the object to its binary representation. +.. image:: ../images/persistence-eventadapter.png Implementing an EventAdapter is rather stright forward: diff --git a/akka-persistence-query/build.sbt b/akka-persistence-query/build.sbt new file mode 100644 index 0000000000..1b57eef61d --- /dev/null +++ b/akka-persistence-query/build.sbt @@ -0,0 +1,16 @@ +import akka.{ AkkaBuild, Dependencies, Formatting, ScaladocNoVerificationOfDiagrams, OSGi } +import com.typesafe.tools.mima.plugin.MimaKeys + +AkkaBuild.defaultSettings + +AkkaBuild.experimentalSettings + +Formatting.formatSettings + +OSGi.persistenceQuery + +Dependencies.persistenceQuery + +//MimaKeys.previousArtifact := akkaPreviousArtifact("akka-persistence-query-experimental").value + +enablePlugins(ScaladocNoVerificationOfDiagrams) diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala new file mode 100644 index 0000000000..5b6e35e263 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Hint.scala @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence.query + +import scala.concurrent.duration.FiniteDuration + +/** + * A query hint that defines how to execute the query, + * typically specific to the journal implementation. + * + * A plugin may optionally support a [[Hint]]. + */ +trait Hint + +/** + * If the underlying datastore only supports queries that are completed when they reach the + * end of the "result set", the journal has to submit new queries after a while in order + * to support "infinite" event streams that include events stored after the initial query has completed. + * + * A plugin may optionally support this [[Hint]] for defining such a refresh interval. + */ +final case class RefreshInterval(interval: FiniteDuration) extends Hint + +/** + * Indicates that the event stream is supposed to be completed immediately when it + * reaches the end of the "result set", as described in [[RefreshInterval]]. + * + */ +final case object NoRefresh extends NoRefresh { + /** Java API */ + def getInstance: NoRefresh = this +} +sealed class NoRefresh extends Hint + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala new file mode 100644 index 0000000000..bb93297b33 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/PersistenceQuery.scala @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence.query + +import java.util.concurrent.atomic.AtomicReference + +import akka.actor._ +import akka.event.Logging + +import scala.annotation.tailrec +import scala.util.Failure + +/** + * Persistence extension for queries. + */ +object PersistenceQuery extends ExtensionId[PersistenceQuery] with ExtensionIdProvider { + /** + * Java API. + */ + override def get(system: ActorSystem): PersistenceQuery = super.get(system) + + def createExtension(system: ExtendedActorSystem): PersistenceQuery = new PersistenceQuery(system) + + def lookup() = PersistenceQuery + + /** INTERNAL API. */ + private[persistence] case class PluginHolder(plugin: scaladsl.ReadJournal) extends Extension + +} + +class PersistenceQuery(system: ExtendedActorSystem) extends Extension { + import PersistenceQuery._ + + private val log = Logging(system, getClass) + + /** Discovered query plugins. */ + private val readJournalPluginExtensionIds = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) + + /** + * Returns the [[akka.persistence.query.scaladsl.ReadJournal]] specified by the given read journal configuration entry. + */ + @tailrec final def readJournalFor(readJournalPluginId: String): scaladsl.ReadJournal = { + val configPath = readJournalPluginId + val extensionIdMap = readJournalPluginExtensionIds.get + extensionIdMap.get(configPath) match { + case Some(extensionId) ⇒ + extensionId(system).plugin + case None ⇒ + val extensionId = new ExtensionId[PluginHolder] { + override def createExtension(system: ExtendedActorSystem): PluginHolder = + PluginHolder(createPlugin(configPath)) + } + readJournalPluginExtensionIds.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) + readJournalFor(readJournalPluginId) // Recursive invocation. + } + } + + /** + * Java API + * + * Returns the [[akka.persistence.query.javadsl.ReadJournal]] specified by the given read journal configuration entry. + */ + final def getReadJournalFor(readJournalPluginId: String): javadsl.ReadJournal = + new javadsl.ReadJournal(readJournalFor(readJournalPluginId)) + + private def createPlugin(configPath: String): scaladsl.ReadJournal = { + require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), + s"'reference.conf' is missing persistence read journal plugin config path: '${configPath}'") + val pluginActorName = configPath + val pluginConfig = system.settings.config.getConfig(configPath) + val pluginClassName = pluginConfig.getString("class") + log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}") + val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get + + val plugin = system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, (classOf[ExtendedActorSystem], system) :: Nil) + .orElse(system.dynamicAccess.createInstanceFor[scaladsl.ReadJournal](pluginClass, Nil)) + .recoverWith { + case ex: Exception ⇒ Failure.apply(new IllegalArgumentException(s"Unable to create read journal plugin instance for path [$configPath], class [$pluginClassName]!", ex)) + } + + // TODO possibly apply event adapters here + plugin.get + } + + /** Check for default or missing identity. */ + private def isEmpty(text: String) = text == null || text.length == 0 +} + diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala new file mode 100644 index 0000000000..fdca0ec0c3 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Query.scala @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence.query + +/** + * General interface for all queries. There are a few pre-defined queries, + * such as [[EventsByPersistenceId]], [[AllPersistenceIds]] and [[EventsByTag]] + * but implementation of these queries are optional. Query (journal) plugins + * may define their own specialized queries. + * + * If a query plugin does not support a query it will return a stream that + * will be completed with a failure of [[UnsupportedOperationException]]. + */ +trait Query[T, M] + +/** + * Query all `PersistentActor` identifiers, i.e. as defined by the + * `persistenceId` of the `PersistentActor`. + * + * A plugin may optionally support this [[Query]]. + */ +final case object AllPersistenceIds extends Query[String, Unit] + +/** + * Query events for a specific `PersistentActor` identified by `persistenceId`. + * + * You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr` + * or use `0L` and `Long.MaxValue` respectively to retrieve all events. + * + * The returned event stream should be ordered by sequence number. + * + * A plugin may optionally support this [[Query]]. + */ +final case class EventsByPersistenceId(persistenceId: String, fromSequenceNr: Long = 0L, toSequenceNr: Long = Long.MaxValue) + extends Query[Any, Unit] + +/** + * Query events that have a specific tag. A tag can for example correspond to an + * aggregate root type (in DDD terminology). + * + * The consumer can keep track of its current position in the event stream by storing the + * `offset` and restart the query from a given `offset` after a crash/restart. + * + * The exact meaning of the `offset` depends on the journal and must be documented by the + * read journal plugin. It may be a sequential id number that uniquely identifies the + * position of each event within the event stream. Distributed data stores cannot easily + * support those semantics and they may use a weaker meaning. For example it may be a + * timestamp (taken when the event was created or stored). Timestamps are not unique and + * not strictly ordered, since clocks on different machines may not be synchronized. + * + * The returned event stream should be ordered by `offset` if possible, but this can also be + * difficult to fulfill for a distributed data store. The order must be documented by the + * read journal plugin. + * + * A plugin may optionally support this [[Query]]. + */ +final case class EventsByTag(tag: String, offset: Long = 0L) extends Query[EventEnvelope, Unit] + +/** + * Event wrapper adding meta data for the events in the result stream of + * [[EventsByTag]] query, or similar queries. + */ +//#event-envelope +final case class EventEnvelope( + offset: Long, + persistenceId: String, + sequenceNr: Long, + event: Any) +//#event-envelope diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala new file mode 100644 index 0000000000..7caef0a19b --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/ReadJournal.scala @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query.javadsl + +import akka.persistence.query.{ Query, Hint } +import akka.stream.javadsl.Source + +import scala.annotation.varargs + +/** + * Java API + * + * API for reading persistent events and information derived + * from stored persistent events. + * + * The purpose of the API is not to enforce compatibility between different + * journal implementations, because the technical capabilities may be very different. + * The interface is very open so that different journals may implement specific queries. + * + * Usage: + * {{{ + * final ReadJournal journal = + * PersistenceQuery.get(system).getReadJournalFor(queryPluginConfigPath); + * + * final Source<EventEnvelope, ?> events = + * journal.query(new EventsByTag("mytag", 0L)); + * }}} + */ + +final class ReadJournal(backing: akka.persistence.query.scaladsl.ReadJournal) { + + /** + * Java API + * + * A query that returns a `Source` with output type `T` and materialized value `M`. + * + * The `hints` are optional parameters that defines how to execute the + * query, typically specific to the journal implementation. + * + */ + @varargs def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + backing.query(q, hints: _*).asJava + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala new file mode 100644 index 0000000000..02824269d3 --- /dev/null +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/ReadJournal.scala @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query.scaladsl + +import akka.persistence.query.{ Hint, Query } +import akka.stream.scaladsl.Source + +/** + * API for reading persistent events and information derived + * from stored persistent events. + * + * The purpose of the API is not to enforce compatibility between different + * journal implementations, because the technical capabilities may be very different. + * The interface is very open so that different journals may implement specific queries. + * + * Usage: + * {{{ + * val journal = PersistenceQuery(system).readJournalFor(queryPluginConfigPath) + * val events = journal.query(EventsByTag("mytag", 0L)) + * }}} + * + * For Java API see [[akka.persistence.query.javadsl.ReadJournal]]. + */ +abstract class ReadJournal { + + /** + * A query that returns a `Source` with output type `T` and materialized + * value `M`. + * + * The `hints` are optional parameters that defines how to execute the + * query, typically specific to the journal implementation. + * + */ + def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] + +} diff --git a/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java new file mode 100644 index 0000000000..9714ac8632 --- /dev/null +++ b/akka-persistence-query/src/test/java/akka/persistence/query/PersistenceQueryTest.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query; + +import akka.actor.ActorSystem; +import akka.persistence.query.javadsl.ReadJournal; +import akka.testkit.AkkaJUnitActorSystemResource; +import org.junit.ClassRule; +import scala.runtime.BoxedUnit; + +public class PersistenceQueryTest { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource(PersistenceQueryTest.class.getName()); + + private final ActorSystem system = actorSystemResource.getSystem(); + + private final Hint hint = NoRefresh.getInstance(); + + // compile-only test + @SuppressWarnings("unused") + public void shouldExposeJavaDSLFriendlyQueryJournal() throws Exception { + final ReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor("noop-journal"); + final akka.stream.javadsl.Source tag = readJournal.query(new EventsByTag("tag", 0L), hint, hint); // java varargs + } +} diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/MockReadJournal.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/MockReadJournal.scala new file mode 100644 index 0000000000..ea813a1cd5 --- /dev/null +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/MockReadJournal.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query + +import akka.stream.scaladsl.Source +import com.typesafe.config.{ Config, ConfigFactory } + +/** + * Use for tests only! + * Emits infinite stream of strings (representing queried for events). + */ +class MockReadJournal extends scaladsl.ReadJournal { + override def query[T, M](q: Query[T, M], hints: Hint*): Source[T, M] = + Source(() ⇒ Iterator.from(0)).map(_.toString).asInstanceOf[Source[T, M]] +} + +object MockReadJournal { + final val Identifier = "akka.persistence.query.journal.mock" + + final val config: Config = ConfigFactory.parseString( + s""" + |$Identifier { + | class = "${classOf[MockReadJournal].getCanonicalName}" + |} + """.stripMargin) +} \ No newline at end of file diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala new file mode 100644 index 0000000000..44d442fe03 --- /dev/null +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/PersistenceQuerySpec.scala @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence.query + +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.ActorSystem +import akka.persistence.journal.{ EventAdapter, EventSeq } +import com.typesafe.config.ConfigFactory +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } +import org.scalautils.ConversionCheckedTripleEquals + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class PersistenceQuerySpec extends WordSpecLike with Matchers with BeforeAndAfterAll with ConversionCheckedTripleEquals { + + val anything: Query[String, _] = null + + val eventAdaptersConfig = + s""" + |akka.persistence.query.journal.mock { + | event-adapters { + | adapt = ${classOf[PrefixStringWithPAdapter].getCanonicalName} + | } + |} + """.stripMargin + + "ReadJournal" must { + "be found by full config key" in { + withActorSystem() { system ⇒ + PersistenceQuery.get(system).readJournalFor(MockReadJournal.Identifier) + } + } + + "throw if unable to find query journal by config key" in { + withActorSystem() { system ⇒ + intercept[IllegalArgumentException] { + PersistenceQuery.get(system).readJournalFor(MockReadJournal.Identifier + "-unknown") + }.getMessage should include("missing persistence read journal") + } + } + } + + private val systemCounter = new AtomicInteger() + private def withActorSystem(conf: String = "")(block: ActorSystem ⇒ Unit): Unit = { + val config = + MockReadJournal.config + .withFallback(ConfigFactory.parseString(conf)) + .withFallback(ConfigFactory.parseString(eventAdaptersConfig)) + .withFallback(ConfigFactory.load()) + + val sys = ActorSystem(s"sys-${systemCounter.incrementAndGet()}", config) + try block(sys) finally Await.ready(sys.terminate(), 10.seconds) + } +} + +object ExampleQueryModels { + case class OldModel(value: String) { def promote = NewModel(value) } + case class NewModel(value: String) +} + +class PrefixStringWithPAdapter extends EventAdapter { + override def fromJournal(event: Any, manifest: String) = EventSeq.single("p-" + event) + + override def manifest(event: Any) = "" + override def toJournal(event: Any) = throw new Exception("toJournal should not be called by query side") +} \ No newline at end of file diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index c6cfd7f9f7..a2ab16dc8d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -219,7 +219,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { * When configured, uses `journalPluginId` as absolute path to the journal configuration entry. * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. */ - @tailrec final def journalFor(journalPluginId: String): ActorRef = { + @tailrec private[akka] final def journalFor(journalPluginId: String): ActorRef = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId val extensionIdMap = journalPluginExtensionId.get extensionIdMap.get(configPath) match { @@ -239,12 +239,14 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { } /** + * INTERNAL API + * * Returns a snapshot store plugin actor identified by `snapshotPluginId`. * When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path. * When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry. * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. */ - @tailrec final def snapshotStoreFor(snapshotPluginId: String): ActorRef = { + @tailrec private[akka] final def snapshotStoreFor(snapshotPluginId: String): ActorRef = { val configPath = if (isEmpty(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId val extensionIdMap = snapshotPluginExtensionId.get extensionIdMap.get(configPath) match { diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index d0906c395f..6e8b03c402 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -41,12 +41,12 @@ final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends Per // only check that all persistenceIds are equal when there's more than one in the Seq if (payload match { - case l: List[PersistentRepr] => l.tail.nonEmpty - case v: Vector[PersistentRepr] => v.size > 1 - case _ => true // some other collection type, let's just check + case l: List[PersistentRepr] ⇒ l.tail.nonEmpty + case v: Vector[PersistentRepr] ⇒ v.size > 1 + case _ ⇒ true // some other collection type, let's just check }) require(payload.forall(_.persistenceId == payload.head.persistenceId), - "AtomicWrite must contain messages for the same persistenceId, " + - s"yet different persistenceIds found: ${payload.map(_.persistenceId).toSet}") + "AtomicWrite must contain messages for the same persistenceId, " + + s"yet different persistenceIds found: ${payload.map(_.persistenceId).toSet}") def persistenceId = payload.head.persistenceId def lowestSequenceNr = payload.head.sequenceNr // this assumes they're gapless; they should be (it is only our code creating AWs) diff --git a/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java b/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java index fef46cc316..d660cc79ab 100644 --- a/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java +++ b/akka-persistence/src/test/java/akka/persistence/fsm/AbstractPersistentFSMTest.java @@ -42,6 +42,7 @@ import static org.junit.matchers.JUnitMatchers.hasItems; public class AbstractPersistentFSMTest { private static Option none = Option.none(); + @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("PersistentFSMJavaTest", PersistenceSpec.config( diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index 0eb78a9c46..ff2224e858 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -204,7 +204,7 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config( expectMsg("Failure: wrong-1") expectTerminated(persistentActor) } - "call onPersistFailure and stop if persistAsync fails xoxo" in { + "call onPersistFailure and stop if persistAsync fails" in { system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[AsyncPersistPersistentActor], name) val persistentActor = expectMsgType[ActorRef] persistentActor ! Cmd("a") diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index e49ce06c35..7e9e7831b3 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -54,7 +54,7 @@ object AkkaBuild extends Build { ), aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, clusterMetrics, clusterTools, clusterSharding, distributedData, - slf4j, agent, persistence, persistenceTck, kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed) + slf4j, agent, persistence, persistenceQuery, persistenceTck, kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed) ) lazy val akkaScalaNightly = Project( @@ -64,7 +64,7 @@ object AkkaBuild extends Build { // samples don't work with dbuild right now aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, clusterMetrics, clusterTools, clusterSharding, distributedData, - slf4j, persistence, persistenceTck, kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed) + slf4j, persistence, persistenceQuery, persistenceTck, kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed) ).disablePlugins(ValidatePullRequest) lazy val actor = Project( @@ -163,6 +163,12 @@ object AkkaBuild extends Build { dependencies = Seq(actor, remote % "test->test", testkit % "test->test") ) + lazy val persistenceQuery = Project( + id = "akka-persistence-query-experimental", + base = file("akka-persistence-query"), + dependencies = Seq(persistence % "compile;provided->provided;test->test", testkit % "compile;test->test") + ) + lazy val persistenceTck = Project( id = "akka-persistence-experimental-tck", base = file("akka-persistence-tck"), @@ -192,7 +198,7 @@ object AkkaBuild extends Build { base = file("akka-docs"), dependencies = Seq(actor, testkit % "test->test", remote % "compile;test->test", cluster, clusterMetrics, slf4j, agent, camel, osgi, - persistence % "compile;provided->provided;test->test", persistenceTck, + persistence % "compile;provided->provided;test->test", persistenceTck, persistenceQuery, typed % "compile;test->test", distributedData) ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 78c5277801..0135c8baf5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -19,6 +19,10 @@ object Dependencies { object Compile { // Compile + + // Akka Streams // FIXME: change to project dependency once merged before 2.4.0 + val akkaStream = "com.typesafe.akka" %% "akka-stream-experimental" % "1.0" + val camelCore = "org.apache.camel" % "camel-core" % "2.13.4" exclude("org.slf4j", "slf4j-api") // ApacheV2 // when updating config version, update links ActorSystem ScalaDoc to link to the updated version @@ -108,6 +112,8 @@ object Dependencies { val persistence = l ++= Seq(protobuf, Provided.levelDB, Provided.levelDBNative, Test.scalatest.value, Test.junit, Test.commonsIo, Test.scalaXml) + val persistenceQuery = l ++= Seq(akkaStream, Test.scalatest.value, Test.junit, Test.commonsIo) + val persistenceTck = l ++= Seq(Test.scalatest.value.copy(configurations = Some("compile")), Test.junit.copy(configurations = Some("compile"))) val kernel = l ++= Seq(Test.scalatest.value, Test.junit) diff --git a/project/OSGi.scala b/project/OSGi.scala index 658c76673b..ad3900f927 100644 --- a/project/OSGi.scala +++ b/project/OSGi.scala @@ -47,6 +47,8 @@ object OSGi { val persistence = exports(Seq("akka.persistence.*"), imports = Seq(protobufImport())) + val persistenceQuery = exports(Seq("akka.persistence.query.*")) + val testkit = exports(Seq("akka.testkit.*")) val osgiOptionalImports = Seq(