diff --git a/akka-docs/rst/experimental/index-java.rst b/akka-docs/rst/experimental/index-java.rst index c2d8e70186..c3e8fb91ed 100644 --- a/akka-docs/rst/experimental/index-java.rst +++ b/akka-docs/rst/experimental/index-java.rst @@ -22,7 +22,6 @@ prior deprecation. ../dev/multi-node-testing ../java/lambda-actors ../java/lambda-fsm - ../java/persistence-query Another reason for marking a module as experimental is that it's too early to tell if the module has a maintainer that can take the responsibility diff --git a/akka-docs/rst/experimental/index.rst b/akka-docs/rst/experimental/index.rst index f9e444dc46..2dd65841d2 100644 --- a/akka-docs/rst/experimental/index.rst +++ b/akka-docs/rst/experimental/index.rst @@ -22,7 +22,6 @@ prior deprecation. ../dev/multi-node-testing ../java/lambda-actors ../java/lambda-fsm - ../scala/persistence-query ../scala/typed Another reason for marking a module as experimental is that it's too early diff --git a/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java index 66b6f4d06d..1c22fcca53 100644 --- a/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java +++ b/akka-docs/rst/java/code/docs/persistence/query/MyEventsByTagJavaPublisher.java @@ -9,6 +9,7 @@ import akka.actor.Scheduler; import akka.japi.Pair; import akka.japi.pf.ReceiveBuilder; import akka.persistence.PersistentRepr; +import akka.persistence.query.Offset; import akka.serialization.Serialization; import akka.serialization.SerializationExtension; import akka.stream.actor.AbstractActorPublisher; @@ -107,7 +108,7 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher { final PersistentRepr p = serialization.deserialize(bytes, PersistentRepr.class).get(); - return new EventEnvelope(id, p.persistenceId(), p.sequenceNr(), p.payload()); + return new EventEnvelope(Offset.sequence(id), p.persistenceId(), p.sequenceNr(), p.payload()); }).collect(toList()); } } catch(Exception e) { @@ -121,4 +122,4 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher { onNext(buf.remove(0)); } } -//#events-by-tag-publisher \ No newline at end of file +//#events-by-tag-publisher diff --git a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst index 4d95ce0c56..85872b5782 100644 --- a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst +++ b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst @@ -113,9 +113,6 @@ Instead of the previous ``Long`` offset you can now use the provided ``Offset`` Journals are also free to provide their own specific ``Offset`` types. Consult your journal plugin's documentation for details. ------ - - Cluster ======= diff --git a/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala index e630e56bff..2a60e84fc2 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala @@ -6,7 +6,7 @@ package docs.persistence.query import akka.NotUsed import akka.persistence.journal.{ EventAdapter, EventSeq } import akka.testkit.AkkaSpec -import akka.persistence.query.{ EventEnvelope, EventEnvelope, PersistenceQuery, Sequence } +import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence } import akka.persistence.query.scaladsl._ import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.journal.Tagged diff --git a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala index 92e941f143..d9166d5620 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/MyEventsByTagPublisher.scala @@ -6,7 +6,7 @@ package docs.persistence.query import akka.actor.Props import akka.persistence.PersistentRepr -import akka.persistence.query.{ EventEnvelope, EventEnvelope, Sequence } +import akka.persistence.query.{ EventEnvelope, Sequence } import akka.serialization.SerializationExtension import akka.stream.actor.ActorPublisher import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request } diff --git a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala index e69c304b4e..f15a35e978 100644 --- a/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -60,10 +60,6 @@ object PersistenceQueryDocSpec { val props = MyEventsByTagPublisher.props(tag, offsetValue, refreshInterval) Source.actorPublisher[EventEnvelope](props) .mapMaterializedValue(_ => NotUsed) - .map { - case EventEnvelope(offset, id, seqNr, event) => - EventEnvelope(Sequence(offset), id, seqNr, event) - } case _ ⇒ throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets") } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala index 75a61b3de3..8373815d1a 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/Offset.scala @@ -15,7 +15,7 @@ object Offset { } -trait Offset +abstract class Offset final case class Sequence(value: Long) extends Offset with Ordered[Sequence] { override def compare(that: Sequence): Int = value.compare(that.value) diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsPublisher.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsPublisher.scala index 055ee03230..44be00175d 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsPublisher.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsPublisher.scala @@ -26,6 +26,7 @@ private[akka] object AllPersistenceIdsPublisher { /** * INTERNAL API */ +// FIXME needs a be rewritten as a GraphStage (since 2.5.0) private[akka] class AllPersistenceIdsPublisher(liveQuery: Boolean, maxBufSize: Int, writeJournalPluginId: String) extends ActorPublisher[String] with DeliveryBuffer[String] with ActorLogging { diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala index e5215658e6..c878f385c7 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala @@ -4,16 +4,14 @@ package akka.persistence.query.journal.leveldb import scala.concurrent.duration._ -import akka.actor.ActorLogging -import akka.actor.ActorRef -import akka.actor.Props +import akka.actor.{ ActorLogging, ActorRef, Cancellable, Props } import akka.persistence.JournalProtocol._ import akka.persistence.Persistence import akka.stream.actor.ActorPublisher import akka.stream.actor.ActorPublisherMessage.Cancel import akka.stream.actor.ActorPublisherMessage.Request import akka.persistence.journal.leveldb.LeveldbJournal -import akka.persistence.query.EventEnvelope +import akka.persistence.query.{ EventEnvelope, Sequence } /** * INTERNAL API @@ -40,6 +38,7 @@ private[akka] object EventsByPersistenceIdPublisher { /** * INTERNAL API */ +// FIXME needs a be rewritten as a GraphStage (since 2.5.0) private[akka] abstract class AbstractEventsByPersistenceIdPublisher( val persistenceId: String, val fromSequenceNr: Long, val maxBufSize: Int, val writeJournalPluginId: String) @@ -89,7 +88,7 @@ private[akka] abstract class AbstractEventsByPersistenceIdPublisher( def replaying(limit: Int): Receive = { case ReplayedMessage(p) ⇒ buf :+= EventEnvelope( - offset = p.sequenceNr, + offset = Sequence(p.sequenceNr), persistenceId = persistenceId, sequenceNr = p.sequenceNr, event = p.payload) @@ -120,6 +119,7 @@ private[akka] abstract class AbstractEventsByPersistenceIdPublisher( /** * INTERNAL API */ +// FIXME needs a be rewritten as a GraphStage (since 2.5.0) private[akka] class LiveEventsByPersistenceIdPublisher( persistenceId: String, fromSequenceNr: Long, override val toSequenceNr: Long, refreshInterval: FiniteDuration, @@ -128,7 +128,7 @@ private[akka] class LiveEventsByPersistenceIdPublisher( persistenceId, fromSequenceNr, maxBufSize, writeJournalPluginId) { import EventsByPersistenceIdPublisher._ - val tickTask = + val tickTask: Cancellable = context.system.scheduler.schedule(refreshInterval, refreshInterval, self, Continue)(context.dispatcher) override def postStop(): Unit = diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala index 4f2d41d41b..dcb872a741 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala @@ -4,16 +4,14 @@ package akka.persistence.query.journal.leveldb import scala.concurrent.duration._ -import akka.actor.ActorLogging -import akka.actor.ActorRef -import akka.actor.Props +import akka.actor.{ ActorLogging, ActorRef, Cancellable, Props } import akka.persistence.JournalProtocol._ import akka.persistence.Persistence import akka.stream.actor.ActorPublisher import akka.stream.actor.ActorPublisherMessage.Cancel import akka.stream.actor.ActorPublisherMessage.Request import akka.persistence.journal.leveldb.LeveldbJournal -import akka.persistence.query.EventEnvelope +import akka.persistence.query.{ EventEnvelope, Sequence } import akka.persistence.journal.leveldb.LeveldbJournal.ReplayTaggedMessages import akka.persistence.journal.leveldb.LeveldbJournal.ReplayedTaggedMessage @@ -42,6 +40,7 @@ private[akka] object EventsByTagPublisher { /** * INTERNAL API */ +// FIXME needs a be rewritten as a GraphStage private[akka] abstract class AbstractEventsByTagPublisher( val tag: String, val fromOffset: Long, val maxBufSize: Int, val writeJournalPluginId: String) @@ -91,7 +90,7 @@ private[akka] abstract class AbstractEventsByTagPublisher( def replaying(limit: Int): Receive = { case ReplayedTaggedMessage(p, _, offset) ⇒ buf :+= EventEnvelope( - offset = offset, + offset = Sequence(offset), persistenceId = p.persistenceId, sequenceNr = p.sequenceNr, event = p.payload) @@ -122,6 +121,7 @@ private[akka] abstract class AbstractEventsByTagPublisher( /** * INTERNAL API */ +// FIXME needs a be rewritten as a GraphStage (since 2.5.0) private[akka] class LiveEventsByTagPublisher( tag: String, fromOffset: Long, override val toOffset: Long, refreshInterval: FiniteDuration, @@ -130,7 +130,7 @@ private[akka] class LiveEventsByTagPublisher( tag, fromOffset, maxBufSize, writeJournalPluginId) { import EventsByTagPublisher._ - val tickTask = + val tickTask: Cancellable = context.system.scheduler.schedule(refreshInterval, refreshInterval, self, Continue)(context.dispatcher) override def postStop(): Unit = @@ -159,6 +159,7 @@ private[akka] class LiveEventsByTagPublisher( /** * INTERNAL API */ +// FIXME needs a be rewritten as a GraphStage (since 2.5.0) private[akka] class CurrentEventsByTagPublisher( tag: String, fromOffset: Long, var _toOffset: Long, maxBufSize: Int, writeJournalPluginId: String) diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala index 635a62f75a..bc1a2896b7 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala @@ -4,7 +4,7 @@ package akka.persistence.query.journal.leveldb.javadsl import akka.NotUsed -import akka.persistence.query.{ EventEnvelope, EventEnvelope, Offset } +import akka.persistence.query.{ EventEnvelope, Offset } import akka.persistence.query.javadsl._ import akka.stream.javadsl.Source @@ -26,14 +26,9 @@ import akka.stream.javadsl.Source */ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal) extends ReadJournal - with PersistenceIdsQuery - with CurrentPersistenceIdsQuery - with EventsByPersistenceIdQuery - with CurrentEventsByPersistenceIdQuery - with EventsByTagQuery - with EventsByTagQuery - with CurrentEventsByTagQuery - with CurrentEventsByTagQuery { + with PersistenceIdsQuery with CurrentPersistenceIdsQuery + with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery + with EventsByTagQuery with CurrentEventsByTagQuery { /** * `allPersistenceIds` is used for retrieving all `persistenceIds` of all @@ -141,9 +136,6 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = scaladslReadJournal.eventsByTag(tag, offset).asJava - override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = - scaladslReadJournal.eventsByTag(tag, offset).asJava - /** * Same type of query as [[#eventsByTag]] but the event stream * is completed immediately when it reaches the end of the "result set". Events that are @@ -152,8 +144,6 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = scaladslReadJournal.currentEventsByTag(tag, offset).asJava - override def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = - scaladslReadJournal.currentEventsByTag(tag, offset).asJava } object LeveldbReadJournal { diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala index d32ec788c3..8c58493320 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala @@ -6,20 +6,17 @@ package akka.persistence.query.journal.leveldb.scaladsl import java.net.URLEncoder import akka.NotUsed - -import scala.concurrent.duration._ import akka.actor.ExtendedActorSystem -import akka.persistence.query.{ EventEnvelope, EventEnvelope, Offset, Sequence } -import akka.persistence.query.journal.leveldb.AllPersistenceIdsPublisher -import akka.persistence.query.journal.leveldb.EventsByPersistenceIdPublisher -import akka.persistence.query.journal.leveldb.EventsByTagPublisher -import akka.persistence.query.scaladsl._ -import akka.persistence.query.scaladsl.ReadJournal -import akka.serialization.SerializationExtension -import akka.stream.scaladsl.{ Flow, Source } +import akka.event.Logging +import akka.persistence.query.journal.leveldb.{ AllPersistenceIdsPublisher, EventsByPersistenceIdPublisher, EventsByTagPublisher } +import akka.persistence.query.scaladsl.{ ReadJournal, _ } +import akka.persistence.query.{ EventEnvelope, Offset, Sequence } +import akka.stream.scaladsl.Source import akka.util.ByteString import com.typesafe.config.Config +import scala.concurrent.duration._ + /** * Scala API [[akka.persistence.query.scaladsl.ReadJournal]] implementation for LevelDB. * @@ -35,25 +32,14 @@ import com.typesafe.config.Config * for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`. */ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends ReadJournal - with PersistenceIdsQuery - with CurrentPersistenceIdsQuery - with EventsByPersistenceIdQuery - with CurrentEventsByPersistenceIdQuery - with EventsByTagQuery - with EventsByTagQuery - with CurrentEventsByTagQuery - with CurrentEventsByTagQuery { + with PersistenceIdsQuery with CurrentPersistenceIdsQuery + with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery + with EventsByTagQuery with CurrentEventsByTagQuery { - private val serialization = SerializationExtension(system) private val refreshInterval = Some(config.getDuration("refresh-interval", MILLISECONDS).millis) private val writeJournalPluginId: String = config.getString("write-plugin") private val maxBufSize: Int = config.getInt("max-buffer-size") - private val envelopetoEnvelope2 = Flow[EventEnvelope].map { - case EventEnvelope(offset, persistenceId, sequenceNr, event) ⇒ - EventEnvelope(Sequence(offset), persistenceId, sequenceNr, event) - } - /** * `allPersistenceIds` is used for retrieving all `persistenceIds` of all * persistent actors. @@ -73,8 +59,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re * backend journal. */ override def persistenceIds(): Source[String, NotUsed] = { - // no polling for this query, the write journal will push all changes, i.e. - // no refreshInterval + // no polling for this query, the write journal will push all changes, i.e. no refreshInterval Source.actorPublisher[String](AllPersistenceIdsPublisher.props(liveQuery = true, maxBufSize, writeJournalPluginId)) .mapMaterializedValue(_ ⇒ NotUsed) .named("allPersistenceIds") @@ -103,7 +88,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re * * The returned event stream is ordered by sequence number, i.e. the same order as the * `PersistentActor` persisted the events. The same prefix of stream elements (in same order) - * are returned for multiple executions of the query, except for when events have been deleted. + * are returned for multiple executions of the query, except for when events have been deleted. * * The stream is not completed when it reaches the end of the currently stored events, * but it continues to push new events when new events are persisted. @@ -173,18 +158,15 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re */ override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = offset match { - case Sequence(offsetValue) ⇒ - eventsByTag(tag, offsetValue).via(envelopetoEnvelope2) - case _ ⇒ - throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets") - } + case seq: Sequence ⇒ + Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, seq.value, Long.MaxValue, + refreshInterval, maxBufSize, writeJournalPluginId)) + .mapMaterializedValue(_ ⇒ NotUsed) + .named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) - override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = { - Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offset, Long.MaxValue, - refreshInterval, maxBufSize, writeJournalPluginId)) - .mapMaterializedValue(_ ⇒ NotUsed) - .named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) - } + case _ ⇒ + throw new IllegalArgumentException("LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets") + } /** * Same type of query as [[#eventsByTag]] but the event stream @@ -193,19 +175,15 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re */ override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = offset match { - case Sequence(offsetValue) ⇒ - currentEventsByTag(tag, offsetValue).via(envelopetoEnvelope2) + case seq: Sequence ⇒ + Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, seq.value, Long.MaxValue, + None, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ NotUsed) + .named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) case _ ⇒ - throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets") + throw new IllegalArgumentException("LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets") } - override def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = { - Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offset, Long.MaxValue, - None, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ ⇒ NotUsed) - .named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) - } - } object LeveldbReadJournal { diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala index ea432a466f..f98472521a 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -6,7 +6,7 @@ package akka.persistence.query.journal.leveldb import scala.concurrent.duration._ import akka.persistence.journal.Tagged import akka.persistence.journal.WriteEventAdapter -import akka.persistence.query.{ EventEnvelope, EventEnvelope, PersistenceQuery, Sequence } +import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence } import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.query.scaladsl.EventsByTagQuery import akka.stream.ActorMaterializer