=peq remove bridge from old APIs in PersistenceQuery

This commit is contained in:
Konrad Malawski 2016-12-14 15:11:54 +01:00
parent d31db86567
commit b036b555e4
14 changed files with 50 additions and 88 deletions

View file

@ -15,7 +15,7 @@ object Offset {
}
trait Offset
abstract class Offset
final case class Sequence(value: Long) extends Offset with Ordered[Sequence] {
override def compare(that: Sequence): Int = value.compare(that.value)

View file

@ -26,6 +26,7 @@ private[akka] object AllPersistenceIdsPublisher {
/**
* INTERNAL API
*/
// FIXME needs a be rewritten as a GraphStage (since 2.5.0)
private[akka] class AllPersistenceIdsPublisher(liveQuery: Boolean, maxBufSize: Int, writeJournalPluginId: String)
extends ActorPublisher[String] with DeliveryBuffer[String] with ActorLogging {

View file

@ -4,16 +4,14 @@
package akka.persistence.query.journal.leveldb
import scala.concurrent.duration._
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.{ ActorLogging, ActorRef, Cancellable, Props }
import akka.persistence.JournalProtocol._
import akka.persistence.Persistence
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Cancel
import akka.stream.actor.ActorPublisherMessage.Request
import akka.persistence.journal.leveldb.LeveldbJournal
import akka.persistence.query.EventEnvelope
import akka.persistence.query.{ EventEnvelope, Sequence }
/**
* INTERNAL API
@ -40,6 +38,7 @@ private[akka] object EventsByPersistenceIdPublisher {
/**
* INTERNAL API
*/
// FIXME needs a be rewritten as a GraphStage (since 2.5.0)
private[akka] abstract class AbstractEventsByPersistenceIdPublisher(
val persistenceId: String, val fromSequenceNr: Long,
val maxBufSize: Int, val writeJournalPluginId: String)
@ -89,7 +88,7 @@ private[akka] abstract class AbstractEventsByPersistenceIdPublisher(
def replaying(limit: Int): Receive = {
case ReplayedMessage(p)
buf :+= EventEnvelope(
offset = p.sequenceNr,
offset = Sequence(p.sequenceNr),
persistenceId = persistenceId,
sequenceNr = p.sequenceNr,
event = p.payload)
@ -120,6 +119,7 @@ private[akka] abstract class AbstractEventsByPersistenceIdPublisher(
/**
* INTERNAL API
*/
// FIXME needs a be rewritten as a GraphStage (since 2.5.0)
private[akka] class LiveEventsByPersistenceIdPublisher(
persistenceId: String, fromSequenceNr: Long, override val toSequenceNr: Long,
refreshInterval: FiniteDuration,
@ -128,7 +128,7 @@ private[akka] class LiveEventsByPersistenceIdPublisher(
persistenceId, fromSequenceNr, maxBufSize, writeJournalPluginId) {
import EventsByPersistenceIdPublisher._
val tickTask =
val tickTask: Cancellable =
context.system.scheduler.schedule(refreshInterval, refreshInterval, self, Continue)(context.dispatcher)
override def postStop(): Unit =

View file

@ -4,16 +4,14 @@
package akka.persistence.query.journal.leveldb
import scala.concurrent.duration._
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.{ ActorLogging, ActorRef, Cancellable, Props }
import akka.persistence.JournalProtocol._
import akka.persistence.Persistence
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Cancel
import akka.stream.actor.ActorPublisherMessage.Request
import akka.persistence.journal.leveldb.LeveldbJournal
import akka.persistence.query.EventEnvelope
import akka.persistence.query.{ EventEnvelope, Sequence }
import akka.persistence.journal.leveldb.LeveldbJournal.ReplayTaggedMessages
import akka.persistence.journal.leveldb.LeveldbJournal.ReplayedTaggedMessage
@ -42,6 +40,7 @@ private[akka] object EventsByTagPublisher {
/**
* INTERNAL API
*/
// FIXME needs a be rewritten as a GraphStage
private[akka] abstract class AbstractEventsByTagPublisher(
val tag: String, val fromOffset: Long,
val maxBufSize: Int, val writeJournalPluginId: String)
@ -91,7 +90,7 @@ private[akka] abstract class AbstractEventsByTagPublisher(
def replaying(limit: Int): Receive = {
case ReplayedTaggedMessage(p, _, offset)
buf :+= EventEnvelope(
offset = offset,
offset = Sequence(offset),
persistenceId = p.persistenceId,
sequenceNr = p.sequenceNr,
event = p.payload)
@ -122,6 +121,7 @@ private[akka] abstract class AbstractEventsByTagPublisher(
/**
* INTERNAL API
*/
// FIXME needs a be rewritten as a GraphStage (since 2.5.0)
private[akka] class LiveEventsByTagPublisher(
tag: String, fromOffset: Long, override val toOffset: Long,
refreshInterval: FiniteDuration,
@ -130,7 +130,7 @@ private[akka] class LiveEventsByTagPublisher(
tag, fromOffset, maxBufSize, writeJournalPluginId) {
import EventsByTagPublisher._
val tickTask =
val tickTask: Cancellable =
context.system.scheduler.schedule(refreshInterval, refreshInterval, self, Continue)(context.dispatcher)
override def postStop(): Unit =
@ -159,6 +159,7 @@ private[akka] class LiveEventsByTagPublisher(
/**
* INTERNAL API
*/
// FIXME needs a be rewritten as a GraphStage (since 2.5.0)
private[akka] class CurrentEventsByTagPublisher(
tag: String, fromOffset: Long, var _toOffset: Long,
maxBufSize: Int, writeJournalPluginId: String)

View file

@ -4,7 +4,7 @@
package akka.persistence.query.journal.leveldb.javadsl
import akka.NotUsed
import akka.persistence.query.{ EventEnvelope, EventEnvelope, Offset }
import akka.persistence.query.{ EventEnvelope, Offset }
import akka.persistence.query.javadsl._
import akka.stream.javadsl.Source
@ -26,14 +26,9 @@ import akka.stream.javadsl.Source
*/
class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal)
extends ReadJournal
with PersistenceIdsQuery
with CurrentPersistenceIdsQuery
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery
with EventsByTagQuery
with EventsByTagQuery
with CurrentEventsByTagQuery
with CurrentEventsByTagQuery {
with PersistenceIdsQuery with CurrentPersistenceIdsQuery
with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery
with EventsByTagQuery with CurrentEventsByTagQuery {
/**
* `allPersistenceIds` is used for retrieving all `persistenceIds` of all
@ -141,9 +136,6 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
scaladslReadJournal.eventsByTag(tag, offset).asJava
override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =
scaladslReadJournal.eventsByTag(tag, offset).asJava
/**
* Same type of query as [[#eventsByTag]] but the event stream
* is completed immediately when it reaches the end of the "result set". Events that are
@ -152,8 +144,6 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev
override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
scaladslReadJournal.currentEventsByTag(tag, offset).asJava
override def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =
scaladslReadJournal.currentEventsByTag(tag, offset).asJava
}
object LeveldbReadJournal {

View file

@ -6,20 +6,17 @@ package akka.persistence.query.journal.leveldb.scaladsl
import java.net.URLEncoder
import akka.NotUsed
import scala.concurrent.duration._
import akka.actor.ExtendedActorSystem
import akka.persistence.query.{ EventEnvelope, EventEnvelope, Offset, Sequence }
import akka.persistence.query.journal.leveldb.AllPersistenceIdsPublisher
import akka.persistence.query.journal.leveldb.EventsByPersistenceIdPublisher
import akka.persistence.query.journal.leveldb.EventsByTagPublisher
import akka.persistence.query.scaladsl._
import akka.persistence.query.scaladsl.ReadJournal
import akka.serialization.SerializationExtension
import akka.stream.scaladsl.{ Flow, Source }
import akka.event.Logging
import akka.persistence.query.journal.leveldb.{ AllPersistenceIdsPublisher, EventsByPersistenceIdPublisher, EventsByTagPublisher }
import akka.persistence.query.scaladsl.{ ReadJournal, _ }
import akka.persistence.query.{ EventEnvelope, Offset, Sequence }
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.typesafe.config.Config
import scala.concurrent.duration._
/**
* Scala API [[akka.persistence.query.scaladsl.ReadJournal]] implementation for LevelDB.
*
@ -35,25 +32,14 @@ import com.typesafe.config.Config
* for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`.
*/
class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends ReadJournal
with PersistenceIdsQuery
with CurrentPersistenceIdsQuery
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery
with EventsByTagQuery
with EventsByTagQuery
with CurrentEventsByTagQuery
with CurrentEventsByTagQuery {
with PersistenceIdsQuery with CurrentPersistenceIdsQuery
with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery
with EventsByTagQuery with CurrentEventsByTagQuery {
private val serialization = SerializationExtension(system)
private val refreshInterval = Some(config.getDuration("refresh-interval", MILLISECONDS).millis)
private val writeJournalPluginId: String = config.getString("write-plugin")
private val maxBufSize: Int = config.getInt("max-buffer-size")
private val envelopetoEnvelope2 = Flow[EventEnvelope].map {
case EventEnvelope(offset, persistenceId, sequenceNr, event)
EventEnvelope(Sequence(offset), persistenceId, sequenceNr, event)
}
/**
* `allPersistenceIds` is used for retrieving all `persistenceIds` of all
* persistent actors.
@ -73,8 +59,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
* backend journal.
*/
override def persistenceIds(): Source[String, NotUsed] = {
// no polling for this query, the write journal will push all changes, i.e.
// no refreshInterval
// no polling for this query, the write journal will push all changes, i.e. no refreshInterval
Source.actorPublisher[String](AllPersistenceIdsPublisher.props(liveQuery = true, maxBufSize, writeJournalPluginId))
.mapMaterializedValue(_ NotUsed)
.named("allPersistenceIds")
@ -103,7 +88,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
*
* The returned event stream is ordered by sequence number, i.e. the same order as the
* `PersistentActor` persisted the events. The same prefix of stream elements (in same order)
* are returned for multiple executions of the query, except for when events have been deleted.
* are returned for multiple executions of the query, except for when events have been deleted.
*
* The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted.
@ -173,18 +158,15 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
*/
override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] =
offset match {
case Sequence(offsetValue)
eventsByTag(tag, offsetValue).via(envelopetoEnvelope2)
case _
throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets")
}
case seq: Sequence
Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, seq.value, Long.MaxValue,
refreshInterval, maxBufSize, writeJournalPluginId))
.mapMaterializedValue(_ NotUsed)
.named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
override def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = {
Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offset, Long.MaxValue,
refreshInterval, maxBufSize, writeJournalPluginId))
.mapMaterializedValue(_ NotUsed)
.named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
}
case _
throw new IllegalArgumentException("LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets")
}
/**
* Same type of query as [[#eventsByTag]] but the event stream
@ -193,19 +175,15 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
*/
override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] =
offset match {
case Sequence(offsetValue)
currentEventsByTag(tag, offsetValue).via(envelopetoEnvelope2)
case seq: Sequence
Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, seq.value, Long.MaxValue,
None, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ NotUsed)
.named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
case _
throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets")
throw new IllegalArgumentException("LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets")
}
override def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = {
Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, offset, Long.MaxValue,
None, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ NotUsed)
.named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
}
}
object LeveldbReadJournal {

View file

@ -6,7 +6,7 @@ package akka.persistence.query.journal.leveldb
import scala.concurrent.duration._
import akka.persistence.journal.Tagged
import akka.persistence.journal.WriteEventAdapter
import akka.persistence.query.{ EventEnvelope, EventEnvelope, PersistenceQuery, Sequence }
import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence }
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.query.scaladsl.EventsByTagQuery
import akka.stream.ActorMaterializer