Validate leveldb write plugin is enabled for leveldb read plugin (#29288)

* Validate leveldb write plugin is enabled for leveldb read plugin

And remove some weird overriding

* mima
This commit is contained in:
Christopher Batey 2020-06-24 08:01:03 +01:00 committed by GitHub
parent ce07d5f15a
commit 7dfcc0bfd0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 77 additions and 66 deletions

View file

@ -32,9 +32,8 @@ object LeveldbPersistenceQueryDocSpec {
//#tagger //#tagger
} }
class LeveldbPersistenceQueryDocSpec(config: String) extends AkkaSpec(config) { class LeveldbPersistenceQueryDocSpec
extends AkkaSpec("akka.persistence.journal.plugin = akka.persistence.journal.leveldb") {
def this() = this("")
"LeveldbPersistentQuery" must { "LeveldbPersistentQuery" must {
"demonstrate how get ReadJournal" in { "demonstrate how get ReadJournal" in {

View file

@ -0,0 +1,3 @@
ProblemFilters.exclude[Problem]("akka.persistence.query.journal.leveldb.AllPersistenceIdsStage.*")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.query.journal.leveldb.EventsByPersistenceIdStage.createLogicAndMaterializedValue")
ProblemFilters.exclude[Problem]("akka.persistence.query.journal.leveldb.EventsByTagStage.*")

View file

@ -4,7 +4,6 @@
package akka.persistence.query.journal.leveldb package akka.persistence.query.journal.leveldb
import akka.NotUsed
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.Persistence import akka.persistence.Persistence
@ -22,7 +21,7 @@ import akka.stream.stage.TimerGraphStageLogicWithLogging
* INTERNAL API * INTERNAL API
*/ */
@InternalApi @InternalApi
final private[akka] class AllPersistenceIdsStage(liveQuery: Boolean, writeJournalPluginId: String) final private[akka] class AllPersistenceIdsStage(liveQuery: Boolean, writeJournalPluginId: String, mat: Materializer)
extends GraphStage[SourceShape[String]] { extends GraphStage[SourceShape[String]] {
val out: Outlet[String] = Outlet("AllPersistenceIds.out") val out: Outlet[String] = Outlet("AllPersistenceIds.out")
@ -30,14 +29,9 @@ final private[akka] class AllPersistenceIdsStage(liveQuery: Boolean, writeJourna
override def shape: SourceShape[String] = SourceShape(out) override def shape: SourceShape[String] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
throw new UnsupportedOperationException("Not used") new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[String] {
override private[akka] def createLogicAndMaterializedValue(
inheritedAttributes: Attributes,
eagerMaterializer: Materializer): (GraphStageLogic, NotUsed) = {
val logic = new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[String] {
setHandler(out, this) setHandler(out, this)
val journal: ActorRef = Persistence(eagerMaterializer.system).journalFor(writeJournalPluginId) val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
var initialResponseReceived = false var initialResponseReceived = false
override protected def logSource: Class[_] = classOf[AllPersistenceIdsStage] override protected def logSource: Class[_] = classOf[AllPersistenceIdsStage]
@ -71,7 +65,4 @@ final private[akka] class AllPersistenceIdsStage(liveQuery: Boolean, writeJourna
} }
} }
(logic, NotUsed)
}
} }

View file

@ -6,7 +6,6 @@ package akka.persistence.query.journal.leveldb
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.NotUsed
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.JournalProtocol.RecoverySuccess import akka.persistence.JournalProtocol.RecoverySuccess
@ -53,14 +52,8 @@ final private[akka] class EventsByPersistenceIdStage(
override def shape: SourceShape[EventEnvelope] = SourceShape(out) override def shape: SourceShape[EventEnvelope] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
throw new UnsupportedOperationException("Not used") new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] {
override private[akka] def createLogicAndMaterializedValue(
inheritedAttributes: Attributes,
materializer: Materializer): (GraphStageLogic, NotUsed) = {
val logic = new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] {
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId) val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
var currSeqNo = fromSequenceNr
var stageActorRef: ActorRef = null var stageActorRef: ActorRef = null
var replayInProgress = false var replayInProgress = false
var outstandingReplay = false var outstandingReplay = false
@ -161,7 +154,4 @@ final private[akka] class EventsByPersistenceIdStage(
setHandler(out, this) setHandler(out, this)
} }
(logic, NotUsed)
}
} }

View file

@ -6,7 +6,6 @@ package akka.persistence.query.journal.leveldb
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.NotUsed
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.JournalProtocol.RecoverySuccess import akka.persistence.JournalProtocol.RecoverySuccess
@ -45,7 +44,8 @@ final private[leveldb] class EventsByTagStage(
maxBufSize: Int, maxBufSize: Int,
initialTooOffset: Long, initialTooOffset: Long,
writeJournalPluginId: String, writeJournalPluginId: String,
refreshInterval: Option[FiniteDuration]) refreshInterval: Option[FiniteDuration],
mat: Materializer)
extends GraphStage[SourceShape[EventEnvelope]] { extends GraphStage[SourceShape[EventEnvelope]] {
val out: Outlet[EventEnvelope] = Outlet("EventsByTagSource") val out: Outlet[EventEnvelope] = Outlet("EventsByTagSource")
@ -53,14 +53,8 @@ final private[leveldb] class EventsByTagStage(
override def shape: SourceShape[EventEnvelope] = SourceShape(out) override def shape: SourceShape[EventEnvelope] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
throw new UnsupportedOperationException("Not used") new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] {
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
override private[akka] def createLogicAndMaterializedValue(
inheritedAttributes: Attributes,
eagerMaterializer: Materializer): (GraphStageLogic, NotUsed) = {
val logic = new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] {
val journal: ActorRef = Persistence(eagerMaterializer.system).journalFor(writeJournalPluginId)
var currOffset: Long = fromOffset var currOffset: Long = fromOffset
var toOffset: Long = initialTooOffset var toOffset: Long = initialTooOffset
var stageActorRef: ActorRef = null var stageActorRef: ActorRef = null
@ -158,8 +152,4 @@ final private[leveldb] class EventsByTagStage(
setHandler(out, this) setHandler(out, this)
} }
(logic, NotUsed)
}
} }

View file

@ -18,7 +18,7 @@ import akka.stream.javadsl.Source
* PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier()); * PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier());
* }}} * }}}
* *
* Corresponding Scala API is in [[akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal]]. * Corresponding Scala API is in [[akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal]].
* *
* Configuration settings can be defined in the configuration section with the * Configuration settings can be defined in the configuration section with the
* absolute path corresponding to the identifier, which is `"akka.persistence.query.journal.leveldb"` * absolute path corresponding to the identifier, which is `"akka.persistence.query.journal.leveldb"`

View file

@ -24,7 +24,6 @@ import akka.persistence.query.scaladsl._
import akka.persistence.query.scaladsl.ReadJournal import akka.persistence.query.scaladsl.ReadJournal
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import akka.util.ByteString import akka.util.ByteString
import akka.util.unused
/** /**
* Scala API [[akka.persistence.query.scaladsl.ReadJournal]] implementation for LevelDB. * Scala API [[akka.persistence.query.scaladsl.ReadJournal]] implementation for LevelDB.
@ -40,7 +39,7 @@ import akka.util.unused
* absolute path corresponding to the identifier, which is `"akka.persistence.query.journal.leveldb"` * absolute path corresponding to the identifier, which is `"akka.persistence.query.journal.leveldb"`
* for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`. * for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`.
*/ */
class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config) class LeveldbReadJournal(system: ExtendedActorSystem, config: Config)
extends ReadJournal extends ReadJournal
with PersistenceIdsQuery with PersistenceIdsQuery
with CurrentPersistenceIdsQuery with CurrentPersistenceIdsQuery
@ -53,6 +52,17 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config)
private val writeJournalPluginId: String = config.getString("write-plugin") private val writeJournalPluginId: String = config.getString("write-plugin")
private val maxBufSize: Int = config.getInt("max-buffer-size") private val maxBufSize: Int = config.getInt("max-buffer-size")
private val resolvedWriteJournalPluginId =
if (writeJournalPluginId.isEmpty)
system.settings.config.getString("akka.persistence.journal.plugin")
else
writeJournalPluginId
require(
resolvedWriteJournalPluginId.nonEmpty && system.settings.config
.getConfig(resolvedWriteJournalPluginId)
.getString("class") == "akka.persistence.journal.leveldb.LeveldbJournal",
s"Leveldb read journal can only work with a Leveldb write journal. Current plugin [$resolvedWriteJournalPluginId] is not a LeveldbJournal")
/** /**
* `persistenceIds` is used for retrieving all `persistenceIds` of all * `persistenceIds` is used for retrieving all `persistenceIds` of all
* persistent actors. * persistent actors.
@ -73,7 +83,13 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config)
*/ */
override def persistenceIds(): Source[String, NotUsed] = override def persistenceIds(): Source[String, NotUsed] =
// no polling for this query, the write journal will push all changes, i.e. no refreshInterval // no polling for this query, the write journal will push all changes, i.e. no refreshInterval
Source.fromGraph(new AllPersistenceIdsStage(liveQuery = true, writeJournalPluginId)).named("allPersistenceIds") Source
.fromMaterializer { (mat, _) =>
Source
.fromGraph(new AllPersistenceIdsStage(liveQuery = true, writeJournalPluginId, mat))
.named("allPersistenceIds")
}
.mapMaterializedValue(_ => NotUsed)
/** /**
* Same type of query as [[#persistenceIds]] but the stream * Same type of query as [[#persistenceIds]] but the stream
@ -81,7 +97,13 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config)
* actors that are created after the query is completed are not included in the stream. * actors that are created after the query is completed are not included in the stream.
*/ */
override def currentPersistenceIds(): Source[String, NotUsed] = override def currentPersistenceIds(): Source[String, NotUsed] =
Source.fromGraph(new AllPersistenceIdsStage(liveQuery = false, writeJournalPluginId)).named("allPersistenceIds") Source
.fromMaterializer { (mat, _) =>
Source
.fromGraph(new AllPersistenceIdsStage(liveQuery = false, writeJournalPluginId, mat))
.named("allPersistenceIds")
}
.mapMaterializedValue(_ => NotUsed)
/** /**
* `eventsByPersistenceId` is used for retrieving events for a specific * `eventsByPersistenceId` is used for retrieving events for a specific
@ -197,18 +219,29 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config)
* backend journal. * backend journal.
*/ */
override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] =
offset match { Source
case seq: Sequence => .fromMaterializer { (mat, _) =>
Source offset match {
.fromGraph( case seq: Sequence =>
new EventsByTagStage(tag, seq.value, maxBufSize, Long.MaxValue, writeJournalPluginId, refreshInterval)) Source
.named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) .fromGraph(
new EventsByTagStage(
tag,
seq.value,
maxBufSize,
Long.MaxValue,
writeJournalPluginId,
refreshInterval,
mat))
.named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
case NoOffset => eventsByTag(tag, Sequence(0L)) //recursive case NoOffset => eventsByTag(tag, Sequence(0L)) //recursive
case _ => case _ =>
throw new IllegalArgumentException( throw new IllegalArgumentException(
"LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets") "LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets")
} }
}
.mapMaterializedValue(_ => NotUsed)
/** /**
* Same type of query as [[#eventsByTag]] but the event stream * Same type of query as [[#eventsByTag]] but the event stream
@ -216,16 +249,21 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config)
* stored after the query is completed are not included in the event stream. * stored after the query is completed are not included in the event stream.
*/ */
override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] =
offset match { Source
case seq: Sequence => .fromMaterializer { (mat, _) =>
Source offset match {
.fromGraph(new EventsByTagStage(tag, seq.value, maxBufSize, Long.MaxValue, writeJournalPluginId, None)) case seq: Sequence =>
.named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8)) Source
case NoOffset => currentEventsByTag(tag, Sequence(0L)) .fromGraph(
case _ => new EventsByTagStage(tag, seq.value, maxBufSize, Long.MaxValue, writeJournalPluginId, None, mat))
throw new IllegalArgumentException( .named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
"LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets") case NoOffset => currentEventsByTag(tag, Sequence(0L))
} case _ =>
throw new IllegalArgumentException(
"LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets")
}
}
.mapMaterializedValue(_ => NotUsed)
} }