Deprecate actor materializer (#27538)
This commit is contained in:
parent
8765a4fbe6
commit
b68d67008a
118 changed files with 1233 additions and 731 deletions
|
|
@ -4,31 +4,40 @@
|
|||
|
||||
package akka.persistence.query.journal.leveldb
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.actor.ActorRef
|
||||
import akka.annotation.InternalApi
|
||||
import akka.persistence.Persistence
|
||||
import akka.persistence.journal.leveldb.LeveldbJournal
|
||||
import akka.stream.{ ActorMaterializer, Attributes, Outlet, SourceShape }
|
||||
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, TimerGraphStageLogicWithLogging }
|
||||
import akka.stream.Attributes
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.Outlet
|
||||
import akka.stream.SourceShape
|
||||
import akka.stream.stage.GraphStage
|
||||
import akka.stream.stage.GraphStageLogic
|
||||
import akka.stream.stage.OutHandler
|
||||
import akka.stream.stage.TimerGraphStageLogicWithLogging
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
final private[akka] class AllPersistenceIdsStage(
|
||||
liveQuery: Boolean,
|
||||
writeJournalPluginId: String,
|
||||
mat: ActorMaterializer)
|
||||
final private[akka] class AllPersistenceIdsStage(liveQuery: Boolean, writeJournalPluginId: String)
|
||||
extends GraphStage[SourceShape[String]] {
|
||||
|
||||
val out: Outlet[String] = Outlet("AllPersistenceIds.out")
|
||||
|
||||
override def shape: SourceShape[String] = SourceShape(out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
|
||||
new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[String] {
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
throw new UnsupportedOperationException("Not used")
|
||||
|
||||
override private[akka] def createLogicAndMaterializedValue(
|
||||
inheritedAttributes: Attributes,
|
||||
eagerMaterializer: Materializer): (GraphStageLogic, NotUsed) = {
|
||||
val logic = new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[String] {
|
||||
setHandler(out, this)
|
||||
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
|
||||
val journal: ActorRef = Persistence(eagerMaterializer.system).journalFor(writeJournalPluginId)
|
||||
var initialResponseReceived = false
|
||||
|
||||
override def preStart(): Unit = {
|
||||
|
|
@ -60,5 +69,7 @@ final private[akka] class AllPersistenceIdsStage(
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
(logic, NotUsed)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,16 +4,27 @@
|
|||
|
||||
package akka.persistence.query.journal.leveldb
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.actor.ActorRef
|
||||
import akka.annotation.InternalApi
|
||||
import akka.persistence.JournalProtocol.{ RecoverySuccess, ReplayMessages, ReplayMessagesFailure, ReplayedMessage }
|
||||
import akka.persistence.JournalProtocol.RecoverySuccess
|
||||
import akka.persistence.JournalProtocol.ReplayMessages
|
||||
import akka.persistence.JournalProtocol.ReplayMessagesFailure
|
||||
import akka.persistence.JournalProtocol.ReplayedMessage
|
||||
import akka.persistence.Persistence
|
||||
import akka.persistence.journal.leveldb.LeveldbJournal
|
||||
import akka.persistence.journal.leveldb.LeveldbJournal.EventAppended
|
||||
import akka.persistence.query.{ EventEnvelope, Sequence }
|
||||
import akka.persistence.query.EventEnvelope
|
||||
import akka.persistence.query.Sequence
|
||||
import akka.persistence.query.journal.leveldb.EventsByPersistenceIdStage.Continue
|
||||
import akka.stream.{ ActorMaterializer, Attributes, Outlet, SourceShape }
|
||||
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, TimerGraphStageLogicWithLogging }
|
||||
import akka.stream.Attributes
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.Outlet
|
||||
import akka.stream.SourceShape
|
||||
import akka.stream.stage.GraphStage
|
||||
import akka.stream.stage.GraphStageLogic
|
||||
import akka.stream.stage.OutHandler
|
||||
import akka.stream.stage.TimerGraphStageLogicWithLogging
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
|
|
@ -36,11 +47,17 @@ final private[akka] class EventsByPersistenceIdStage(
|
|||
maxBufSize: Int,
|
||||
writeJournalPluginId: String,
|
||||
refreshInterval: Option[FiniteDuration],
|
||||
mat: ActorMaterializer)
|
||||
mat: Materializer)
|
||||
extends GraphStage[SourceShape[EventEnvelope]] {
|
||||
val out: Outlet[EventEnvelope] = Outlet("EventsByPersistenceIdSource")
|
||||
override def shape: SourceShape[EventEnvelope] = SourceShape(out)
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
throw new UnsupportedOperationException("Not used")
|
||||
|
||||
override private[akka] def createLogicAndMaterializedValue(
|
||||
inheritedAttributes: Attributes,
|
||||
materializer: Materializer): (GraphStageLogic, NotUsed) = {
|
||||
val logic = new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] {
|
||||
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
|
||||
var currSeqNo = fromSequenceNr
|
||||
|
|
@ -140,7 +157,7 @@ final private[akka] class EventsByPersistenceIdStage(
|
|||
|
||||
setHandler(out, this)
|
||||
}
|
||||
logic
|
||||
(logic, NotUsed)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,20 +4,27 @@
|
|||
|
||||
package akka.persistence.query.journal.leveldb
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.actor.ActorRef
|
||||
import akka.annotation.InternalApi
|
||||
import akka.persistence.JournalProtocol.{ RecoverySuccess, ReplayMessagesFailure }
|
||||
import akka.persistence.JournalProtocol.RecoverySuccess
|
||||
import akka.persistence.JournalProtocol.ReplayMessagesFailure
|
||||
import akka.persistence.Persistence
|
||||
import akka.persistence.journal.leveldb.LeveldbJournal
|
||||
import akka.persistence.journal.leveldb.LeveldbJournal.{
|
||||
ReplayTaggedMessages,
|
||||
ReplayedTaggedMessage,
|
||||
TaggedEventAppended
|
||||
}
|
||||
import akka.persistence.journal.leveldb.LeveldbJournal.ReplayTaggedMessages
|
||||
import akka.persistence.journal.leveldb.LeveldbJournal.ReplayedTaggedMessage
|
||||
import akka.persistence.journal.leveldb.LeveldbJournal.TaggedEventAppended
|
||||
import akka.persistence.query.journal.leveldb.EventsByTagStage.Continue
|
||||
import akka.persistence.query.{ EventEnvelope, Sequence }
|
||||
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, TimerGraphStageLogicWithLogging }
|
||||
import akka.stream.{ ActorMaterializer, Attributes, Outlet, SourceShape }
|
||||
import akka.persistence.query.EventEnvelope
|
||||
import akka.persistence.query.Sequence
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.stage.GraphStage
|
||||
import akka.stream.stage.GraphStageLogic
|
||||
import akka.stream.stage.OutHandler
|
||||
import akka.stream.stage.TimerGraphStageLogicWithLogging
|
||||
import akka.stream.Attributes
|
||||
import akka.stream.Outlet
|
||||
import akka.stream.SourceShape
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
|
|
@ -38,7 +45,6 @@ final private[leveldb] class EventsByTagStage(
|
|||
maxBufSize: Int,
|
||||
initialTooOffset: Long,
|
||||
writeJournalPluginId: String,
|
||||
mat: ActorMaterializer,
|
||||
refreshInterval: Option[FiniteDuration])
|
||||
extends GraphStage[SourceShape[EventEnvelope]] {
|
||||
|
||||
|
|
@ -46,9 +52,15 @@ final private[leveldb] class EventsByTagStage(
|
|||
|
||||
override def shape: SourceShape[EventEnvelope] = SourceShape(out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
throw new UnsupportedOperationException("Not used")
|
||||
|
||||
override private[akka] def createLogicAndMaterializedValue(
|
||||
inheritedAttributes: Attributes,
|
||||
eagerMaterializer: Materializer): (GraphStageLogic, NotUsed) = {
|
||||
|
||||
val logic = new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] {
|
||||
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
|
||||
val journal: ActorRef = Persistence(eagerMaterializer.system).journalFor(writeJournalPluginId)
|
||||
var currOffset: Long = fromOffset
|
||||
var toOffset: Long = initialTooOffset
|
||||
var stageActorRef: ActorRef = null
|
||||
|
|
@ -144,7 +156,7 @@ final private[leveldb] class EventsByTagStage(
|
|||
setHandler(out, this)
|
||||
}
|
||||
|
||||
logic
|
||||
(logic, NotUsed)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,11 +9,18 @@ import java.net.URLEncoder
|
|||
import akka.NotUsed
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.event.Logging
|
||||
import akka.persistence.query.journal.leveldb.{ AllPersistenceIdsStage, EventsByPersistenceIdStage, EventsByTagStage }
|
||||
import akka.persistence.query.scaladsl.{ ReadJournal, _ }
|
||||
import akka.persistence.query.{ EventEnvelope, NoOffset, Offset, Sequence }
|
||||
import akka.persistence.query.journal.leveldb.AllPersistenceIdsStage
|
||||
import akka.persistence.query.journal.leveldb.EventsByPersistenceIdStage
|
||||
import akka.persistence.query.journal.leveldb.EventsByTagStage
|
||||
import akka.persistence.query.scaladsl.ReadJournal
|
||||
import akka.persistence.query.scaladsl._
|
||||
import akka.persistence.query.EventEnvelope
|
||||
import akka.persistence.query.NoOffset
|
||||
import akka.persistence.query.Offset
|
||||
import akka.persistence.query.Sequence
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.util.{ unused, ByteString }
|
||||
import akka.util.ByteString
|
||||
import akka.util.unused
|
||||
import com.typesafe.config.Config
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
|
@ -63,32 +70,17 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config)
|
|||
* The stream is completed with failure if there is a failure in executing the query in the
|
||||
* backend journal.
|
||||
*/
|
||||
override def persistenceIds(): Source[String, NotUsed] = {
|
||||
override def persistenceIds(): Source[String, NotUsed] =
|
||||
// no polling for this query, the write journal will push all changes, i.e. no refreshInterval
|
||||
Source
|
||||
.setup { (mat, _) =>
|
||||
Source
|
||||
.fromGraph(new AllPersistenceIdsStage(liveQuery = true, writeJournalPluginId, mat))
|
||||
.named("allPersistenceIds")
|
||||
}
|
||||
.mapMaterializedValue(_ => NotUsed)
|
||||
|
||||
}
|
||||
Source.fromGraph(new AllPersistenceIdsStage(liveQuery = true, writeJournalPluginId)).named("allPersistenceIds")
|
||||
|
||||
/**
|
||||
* Same type of query as [[#persistenceIds]] but the stream
|
||||
* is completed immediately when it reaches the end of the "result set". Persistent
|
||||
* actors that are created after the query is completed are not included in the stream.
|
||||
*/
|
||||
override def currentPersistenceIds(): Source[String, NotUsed] = {
|
||||
Source
|
||||
.setup { (mat, _) =>
|
||||
Source
|
||||
.fromGraph(new AllPersistenceIdsStage(liveQuery = false, writeJournalPluginId, mat))
|
||||
.named("allPersistenceIds")
|
||||
}
|
||||
.mapMaterializedValue(_ => NotUsed)
|
||||
}
|
||||
override def currentPersistenceIds(): Source[String, NotUsed] =
|
||||
Source.fromGraph(new AllPersistenceIdsStage(liveQuery = false, writeJournalPluginId)).named("allPersistenceIds")
|
||||
|
||||
/**
|
||||
* `eventsByPersistenceId` is used for retrieving events for a specific
|
||||
|
|
@ -121,7 +113,7 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config)
|
|||
fromSequenceNr: Long = 0L,
|
||||
toSequenceNr: Long = Long.MaxValue): Source[EventEnvelope, NotUsed] = {
|
||||
Source
|
||||
.setup { (mat, _) =>
|
||||
.fromMaterializer { (mat, _) =>
|
||||
Source
|
||||
.fromGraph(
|
||||
new EventsByPersistenceIdStage(
|
||||
|
|
@ -147,7 +139,7 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config)
|
|||
fromSequenceNr: Long = 0L,
|
||||
toSequenceNr: Long = Long.MaxValue): Source[EventEnvelope, NotUsed] = {
|
||||
Source
|
||||
.setup { (mat, _) =>
|
||||
.fromMaterializer { (mat, _) =>
|
||||
Source
|
||||
.fromGraph(
|
||||
new EventsByPersistenceIdStage(
|
||||
|
|
@ -207,20 +199,10 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config)
|
|||
offset match {
|
||||
case seq: Sequence =>
|
||||
Source
|
||||
.setup { (mat, _) =>
|
||||
Source
|
||||
.fromGraph(
|
||||
new EventsByTagStage(
|
||||
tag,
|
||||
seq.value,
|
||||
maxBufSize,
|
||||
Long.MaxValue,
|
||||
writeJournalPluginId,
|
||||
mat,
|
||||
refreshInterval))
|
||||
.named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
|
||||
}
|
||||
.mapMaterializedValue(_ => NotUsed)
|
||||
.fromGraph(
|
||||
new EventsByTagStage(tag, seq.value, maxBufSize, Long.MaxValue, writeJournalPluginId, refreshInterval))
|
||||
.named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
|
||||
|
||||
case NoOffset => eventsByTag(tag, Sequence(0L)) //recursive
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(
|
||||
|
|
@ -232,23 +214,17 @@ class LeveldbReadJournal(@unused system: ExtendedActorSystem, config: Config)
|
|||
* is completed immediately when it reaches the end of the "result set". Events that are
|
||||
* stored after the query is completed are not included in the event stream.
|
||||
*/
|
||||
override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = {
|
||||
Source
|
||||
.setup { (mat, _) =>
|
||||
offset match {
|
||||
case seq: Sequence =>
|
||||
Source
|
||||
.fromGraph(
|
||||
new EventsByTagStage(tag, seq.value, maxBufSize, Long.MaxValue, writeJournalPluginId, mat, None))
|
||||
.named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
|
||||
case NoOffset => currentEventsByTag(tag, Sequence(0L))
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(
|
||||
"LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets")
|
||||
}
|
||||
}
|
||||
.mapMaterializedValue(_ => NotUsed)
|
||||
}
|
||||
override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] =
|
||||
offset match {
|
||||
case seq: Sequence =>
|
||||
Source
|
||||
.fromGraph(new EventsByTagStage(tag, seq.value, maxBufSize, Long.MaxValue, writeJournalPluginId, None))
|
||||
.named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
|
||||
case NoOffset => currentEventsByTag(tag, Sequence(0L))
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(
|
||||
"LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,16 +4,15 @@
|
|||
|
||||
package akka.persistence.query.journal.leveldb
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.persistence.query.PersistenceQuery
|
||||
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
|
||||
import akka.persistence.query.scaladsl.PersistenceIdsQuery
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.testkit.scaladsl.TestSink
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.ImplicitSender
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object AllPersistenceIdsSpec {
|
||||
val config = """
|
||||
akka.loglevel = INFO
|
||||
|
|
@ -28,8 +27,6 @@ object AllPersistenceIdsSpec {
|
|||
|
||||
class AllPersistenceIdsSpec extends AkkaSpec(AllPersistenceIdsSpec.config) with Cleanup with ImplicitSender {
|
||||
|
||||
implicit val mat = ActorMaterializer()(system)
|
||||
|
||||
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
|
||||
|
||||
"Leveldb query AllPersistenceIds" must {
|
||||
|
|
|
|||
|
|
@ -4,17 +4,16 @@
|
|||
|
||||
package akka.persistence.query.journal.leveldb
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.persistence.query.PersistenceQuery
|
||||
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
|
||||
import akka.persistence.query.scaladsl.EventsByTagQuery
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.testkit.scaladsl.TestSink
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.ImplicitSender
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object EventsByPersistenceIdSpec {
|
||||
val config = """
|
||||
akka.loglevel = INFO
|
||||
|
|
@ -30,8 +29,6 @@ object EventsByPersistenceIdSpec {
|
|||
|
||||
class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.config) with Cleanup with ImplicitSender {
|
||||
|
||||
implicit val mat = ActorMaterializer()(system)
|
||||
|
||||
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
|
||||
|
||||
def setup(persistenceId: String): ActorRef = {
|
||||
|
|
|
|||
|
|
@ -4,17 +4,19 @@
|
|||
|
||||
package akka.persistence.query.journal.leveldb
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.persistence.journal.Tagged
|
||||
import akka.persistence.journal.WriteEventAdapter
|
||||
import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence }
|
||||
import akka.persistence.query.NoOffset
|
||||
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
|
||||
import akka.persistence.query.scaladsl.EventsByTagQuery
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.persistence.query.EventEnvelope
|
||||
import akka.persistence.query.PersistenceQuery
|
||||
import akka.persistence.query.Sequence
|
||||
import akka.stream.testkit.scaladsl.TestSink
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.ImplicitSender
|
||||
import akka.persistence.query.NoOffset
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object EventsByTagSpec {
|
||||
val config = s"""
|
||||
|
|
@ -59,8 +61,6 @@ class ColorTagger extends WriteEventAdapter {
|
|||
|
||||
class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with ImplicitSender {
|
||||
|
||||
implicit val mat = ActorMaterializer()(system)
|
||||
|
||||
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
|
||||
|
||||
"Leveldb query EventsByTag" must {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue