From 7dfcc0bfd0546e4e7315d83e6c1caaa12d2e8a37 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Wed, 24 Jun 2020 08:01:03 +0100 Subject: [PATCH] Validate leveldb write plugin is enabled for leveldb read plugin (#29288) * Validate leveldb write plugin is enabled for leveldb read plugin And remove some weird overriding * mima --- .../LeveldbPersistenceQueryDocSpec.scala | 5 +- .../29288-leveldb-cleanups.excludes | 3 + .../leveldb/AllPersistenceIdsStage.scala | 15 +--- .../leveldb/EventsByPersistenceIdStage.scala | 12 +-- .../journal/leveldb/EventsByTagStage.scala | 18 +--- .../leveldb/javadsl/LeveldbReadJournal.scala | 2 +- .../leveldb/scaladsl/LeveldbReadJournal.scala | 88 +++++++++++++------ 7 files changed, 77 insertions(+), 66 deletions(-) create mode 100644 akka-persistence-query/src/main/mima-filters/2.6.7.backwards.excludes/29288-leveldb-cleanups.excludes diff --git a/akka-docs/src/test/scala/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala b/akka-docs/src/test/scala/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala index bcf7a68465..e6880d1a3d 100644 --- a/akka-docs/src/test/scala/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala +++ b/akka-docs/src/test/scala/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala @@ -32,9 +32,8 @@ object LeveldbPersistenceQueryDocSpec { //#tagger } -class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) { - - def this() = this("") +class LeveldbPersistenceQueryDocSpec + extends AkkaSpec("akka.persistence.journal.plugin = akka.persistence.journal.leveldb") { "LeveldbPersistentQuery" must { "demonstrate how get ReadJournal" in { diff --git a/akka-persistence-query/src/main/mima-filters/2.6.7.backwards.excludes/29288-leveldb-cleanups.excludes b/akka-persistence-query/src/main/mima-filters/2.6.7.backwards.excludes/29288-leveldb-cleanups.excludes new file mode 100644 index 0000000000..a7fa1f0f7d --- /dev/null +++ b/akka-persistence-query/src/main/mima-filters/2.6.7.backwards.excludes/29288-leveldb-cleanups.excludes @@ -0,0 +1,3 @@ +ProblemFilters.exclude[Problem]("akka.persistence.query.journal.leveldb.AllPersistenceIdsStage.*") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.query.journal.leveldb.EventsByPersistenceIdStage.createLogicAndMaterializedValue") +ProblemFilters.exclude[Problem]("akka.persistence.query.journal.leveldb.EventsByTagStage.*") diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsStage.scala index 10fdf734a5..e31c4a9fd0 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsStage.scala @@ -4,7 +4,6 @@ package akka.persistence.query.journal.leveldb -import akka.NotUsed import akka.actor.ActorRef import akka.annotation.InternalApi import akka.persistence.Persistence @@ -22,7 +21,7 @@ import akka.stream.stage.TimerGraphStageLogicWithLogging * INTERNAL API */ @InternalApi -final private[akka] class AllPersistenceIdsStage(liveQuery: Boolean, writeJournalPluginId: String) +final private[akka] class AllPersistenceIdsStage(liveQuery: Boolean, writeJournalPluginId: String, mat: Materializer) extends GraphStage[SourceShape[String]] { val out: Outlet[String] = Outlet("AllPersistenceIds.out") @@ -30,14 +29,9 @@ final private[akka] class AllPersistenceIdsStage(liveQuery: Boolean, writeJourna override def shape: SourceShape[String] = SourceShape(out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - throw new UnsupportedOperationException("Not used") - - override private[akka] def createLogicAndMaterializedValue( - inheritedAttributes: Attributes, - eagerMaterializer: Materializer): (GraphStageLogic, NotUsed) = { - val logic = new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[String] { + new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[String] { setHandler(out, this) - val journal: ActorRef = Persistence(eagerMaterializer.system).journalFor(writeJournalPluginId) + val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId) var initialResponseReceived = false override protected def logSource: Class[_] = classOf[AllPersistenceIdsStage] @@ -71,7 +65,4 @@ final private[akka] class AllPersistenceIdsStage(liveQuery: Boolean, writeJourna } } - - (logic, NotUsed) - } } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala index 1fbfcfbc06..70aebd84b8 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala @@ -6,7 +6,6 @@ package akka.persistence.query.journal.leveldb import scala.concurrent.duration.FiniteDuration -import akka.NotUsed import akka.actor.ActorRef import akka.annotation.InternalApi import akka.persistence.JournalProtocol.RecoverySuccess @@ -53,14 +52,8 @@ final private[akka] class EventsByPersistenceIdStage( override def shape: SourceShape[EventEnvelope] = SourceShape(out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - throw new UnsupportedOperationException("Not used") - - override private[akka] def createLogicAndMaterializedValue( - inheritedAttributes: Attributes, - materializer: Materializer): (GraphStageLogic, NotUsed) = { - val logic = new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] { + new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] { val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId) - var currSeqNo = fromSequenceNr var stageActorRef: ActorRef = null var replayInProgress = false var outstandingReplay = false @@ -161,7 +154,4 @@ final private[akka] class EventsByPersistenceIdStage( setHandler(out, this) } - (logic, NotUsed) - } - } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala index 79569a587a..610071f827 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala @@ -6,7 +6,6 @@ package akka.persistence.query.journal.leveldb import scala.concurrent.duration.FiniteDuration -import akka.NotUsed import akka.actor.ActorRef import akka.annotation.InternalApi import akka.persistence.JournalProtocol.RecoverySuccess @@ -45,7 +44,8 @@ final private[leveldb] class EventsByTagStage( maxBufSize: Int, initialTooOffset: Long, writeJournalPluginId: String, - refreshInterval: Option[FiniteDuration]) + refreshInterval: Option[FiniteDuration], + mat: Materializer) extends GraphStage[SourceShape[EventEnvelope]] { val out: Outlet[EventEnvelope] = Outlet("EventsByTagSource") @@ -53,14 +53,8 @@ final private[leveldb] class EventsByTagStage( override def shape: SourceShape[EventEnvelope] = SourceShape(out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - throw new UnsupportedOperationException("Not used") - - override private[akka] def createLogicAndMaterializedValue( - inheritedAttributes: Attributes, - eagerMaterializer: Materializer): (GraphStageLogic, NotUsed) = { - - val logic = new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] { - val journal: ActorRef = Persistence(eagerMaterializer.system).journalFor(writeJournalPluginId) + new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] { + val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId) var currOffset: Long = fromOffset var toOffset: Long = initialTooOffset var stageActorRef: ActorRef = null @@ -158,8 +152,4 @@ final private[leveldb] class EventsByTagStage( setHandler(out, this) } - - (logic, NotUsed) - } - } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala index 34c798ef03..624fc9fc44 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/javadsl/LeveldbReadJournal.scala @@ -18,7 +18,7 @@ import akka.stream.javadsl.Source * PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier()); * }}} * - * Corresponding Scala API is in [[akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal]]. + * Corresponding Scala API is in [[akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal]]. * * Configuration settings can be defined in the configuration section with the * absolute path corresponding to the identifier, which is `"akka.persistence.query.journal.leveldb"` diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala index 0e3a81d19c..f71fa0a744 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/scaladsl/LeveldbReadJournal.scala @@ -24,7 +24,6 @@ import akka.persistence.query.scaladsl._ import akka.persistence.query.scaladsl.ReadJournal import akka.stream.scaladsl.Source import akka.util.ByteString -import akka.util.unused /** * Scala API [[akka.persistence.query.scaladsl.ReadJournal]] implementation for LevelDB. @@ -40,7 +39,7 @@ import akka.util.unused * absolute path corresponding to the identifier, which is `"akka.persistence.query.journal.leveldb"` * for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`. */ -class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config) +class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends ReadJournal with PersistenceIdsQuery with CurrentPersistenceIdsQuery @@ -53,6 +52,17 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config) private val writeJournalPluginId: String = config.getString("write-plugin") private val maxBufSize: Int = config.getInt("max-buffer-size") + private val resolvedWriteJournalPluginId = + if (writeJournalPluginId.isEmpty) + system.settings.config.getString("akka.persistence.journal.plugin") + else + writeJournalPluginId + require( + resolvedWriteJournalPluginId.nonEmpty && system.settings.config + .getConfig(resolvedWriteJournalPluginId) + .getString("class") == "akka.persistence.journal.leveldb.LeveldbJournal", + s"Leveldb read journal can only work with a Leveldb write journal. Current plugin [$resolvedWriteJournalPluginId] is not a LeveldbJournal") + /** * `persistenceIds` is used for retrieving all `persistenceIds` of all * persistent actors. @@ -73,7 +83,13 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config) */ override def persistenceIds(): Source[String, NotUsed] = // no polling for this query, the write journal will push all changes, i.e. no refreshInterval - Source.fromGraph(new AllPersistenceIdsStage(liveQuery = true, writeJournalPluginId)).named("allPersistenceIds") + Source + .fromMaterializer { (mat, _) => + Source + .fromGraph(new AllPersistenceIdsStage(liveQuery = true, writeJournalPluginId, mat)) + .named("allPersistenceIds") + } + .mapMaterializedValue(_ => NotUsed) /** * Same type of query as [[#persistenceIds]] but the stream @@ -81,7 +97,13 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config) * actors that are created after the query is completed are not included in the stream. */ override def currentPersistenceIds(): Source[String, NotUsed] = - Source.fromGraph(new AllPersistenceIdsStage(liveQuery = false, writeJournalPluginId)).named("allPersistenceIds") + Source + .fromMaterializer { (mat, _) => + Source + .fromGraph(new AllPersistenceIdsStage(liveQuery = false, writeJournalPluginId, mat)) + .named("allPersistenceIds") + } + .mapMaterializedValue(_ => NotUsed) /** * `eventsByPersistenceId` is used for retrieving events for a specific @@ -197,18 +219,29 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config) * backend journal. */ override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = - offset match { - case seq: Sequence => - Source - .fromGraph( - new EventsByTagStage(tag, seq.value, maxBufSize, Long.MaxValue, writeJournalPluginId, refreshInterval)) - .named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) + Source + .fromMaterializer { (mat, _) => + offset match { + case seq: Sequence => + Source + .fromGraph( + new EventsByTagStage( + tag, + seq.value, + maxBufSize, + Long.MaxValue, + writeJournalPluginId, + refreshInterval, + mat)) + .named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) - case NoOffset => eventsByTag(tag, Sequence(0L)) //recursive - case _ => - throw new IllegalArgumentException( - "LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets") - } + case NoOffset => eventsByTag(tag, Sequence(0L)) //recursive + case _ => + throw new IllegalArgumentException( + "LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets") + } + } + .mapMaterializedValue(_ => NotUsed) /** * Same type of query as [[#eventsByTag]] but the event stream @@ -216,16 +249,21 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config) * stored after the query is completed are not included in the event stream. */ override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = - offset match { - case seq: Sequence => - Source - .fromGraph(new EventsByTagStage(tag, seq.value, maxBufSize, Long.MaxValue, writeJournalPluginId, None)) - .named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) - case NoOffset => currentEventsByTag(tag, Sequence(0L)) - case _ => - throw new IllegalArgumentException( - "LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets") - } + Source + .fromMaterializer { (mat, _) => + offset match { + case seq: Sequence => + Source + .fromGraph( + new EventsByTagStage(tag, seq.value, maxBufSize, Long.MaxValue, writeJournalPluginId, None, mat)) + .named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) + case NoOffset => currentEventsByTag(tag, Sequence(0L)) + case _ => + throw new IllegalArgumentException( + "LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets") + } + } + .mapMaterializedValue(_ => NotUsed) }