Rename sbt akka modules

Co-authored-by: Sean Glover <sean@seanglover.com>
This commit is contained in:
Matthew de Detrich 2023-01-05 11:10:50 +01:00 committed by GitHub
parent b92b749946
commit 24c03cde19
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2930 changed files with 1466 additions and 1462 deletions

View file

@ -0,0 +1,24 @@
/*
* Copyright (C) 2014-2022 Lightbend Inc. <https://www.lightbend.com>
*/
syntax = "proto2";
package org.apache.pekko.persistence.query;
option java_package = "org.apache.pekko.persistence.query.internal.protobuf";
option optimize_for = SPEED;
import "ContainerFormats.proto";
// for org.apache.pekko.persistence.query.typed.EventEnvelope
message EventEnvelope {
required string persistence_id = 1;
required string entity_type = 2;
required int32 slice = 3;
required int64 sequence_nr = 4;
required int64 timestamp = 5;
required string offset = 6;
required string offset_manifest = 7;
optional Payload event = 8;
optional Payload metadata = 9;
}

View file

@ -0,0 +1,42 @@
#######################################################
# Pekko Persistence Query Reference Configuration File #
#######################################################
# This is the reference config file that contains all the default settings.
# Make your edits in your application.conf in order to override these settings.
#//#query-leveldb
# Configuration for the LeveldbReadJournal
pekko.persistence.query.journal.leveldb {
# Implementation class of the LevelDB ReadJournalProvider
class = "org.apache.pekko.persistence.query.journal.leveldb.LeveldbReadJournalProvider"
# Absolute path to the write journal plugin configuration entry that this
# query journal will connect to. That must be a LeveldbJournal or SharedLeveldbJournal.
# If undefined (or "") it will connect to the default journal as specified by the
# pekko.persistence.journal.plugin property.
write-plugin = ""
# The LevelDB write journal is notifying the query side as soon as things
# are persisted, but for efficiency reasons the query side retrieves the events
# in batches that sometimes can be delayed up to the configured `refresh-interval`.
refresh-interval = 3s
# How many events to fetch in one query (replay) and keep buffered until they
# are delivered downstreams.
max-buffer-size = 100
}
#//#query-leveldb
pekko.actor {
serializers {
pekko-persistence-query = "org.apache.pekko.persistence.query.internal.QuerySerializer"
}
serialization-bindings {
"org.apache.pekko.persistence.query.typed.EventEnvelope" = pekko-persistence-query
"org.apache.pekko.persistence.query.Offset" = pekko-persistence-query
}
serialization-identifiers {
"org.apache.pekko.persistence.query.internal.QuerySerializer" = 39
}
}

View file

@ -0,0 +1,74 @@
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query
import org.apache.pekko.annotation.DoNotInherit
/**
* The `DurableStateStoreQuery` stream elements for `DurableStateStoreQuery`.
*
* The implementation can be a [[UpdatedDurableState]] or a [[DeletedDurableState]].
*
* Not for user extension
*
* @tparam A the type of the value
*/
@DoNotInherit
sealed trait DurableStateChange[A] {
/**
* The persistence id of the origin entity.
*/
def persistenceId: String
/**
* The offset that can be used in next `changes` or `currentChanges` query.
*/
def offset: Offset
}
object UpdatedDurableState {
def unapply[A](arg: UpdatedDurableState[A]): Option[(String, Long, A, Offset, Long)] =
Some((arg.persistenceId, arg.revision, arg.value, arg.offset, arg.timestamp))
}
/**
* @param persistenceId The persistence id of the origin entity.
* @param revision The revision number from the origin entity.
* @param value The object value.
* @param offset The offset that can be used in next `changes` or `currentChanges` query.
* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
* (same as `System.currentTimeMillis`).
* @tparam A the type of the value
*/
final class UpdatedDurableState[A](
val persistenceId: String,
val revision: Long,
val value: A,
override val offset: Offset,
val timestamp: Long)
extends DurableStateChange[A]
object DeletedDurableState {
def unapply[A](arg: DeletedDurableState[A]): Option[(String, Long, Offset, Long)] =
Some((arg.persistenceId, arg.revision, arg.offset, arg.timestamp))
}
/**
* @param persistenceId The persistence id of the origin entity.
* @param revision The revision number from the origin entity.
* @param offset The offset that can be used in next `changes` or `currentChanges` query.
* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
* (same as `System.currentTimeMillis`).
* @tparam A the type of the value
*/
final class DeletedDurableState[A](
val persistenceId: String,
val revision: Long,
override val offset: Offset,
val timestamp: Long)
extends DurableStateChange[A]

View file

@ -0,0 +1,110 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query
import java.util.Optional
import org.apache.pekko.annotation.InternalApi
import scala.runtime.AbstractFunction4
import org.apache.pekko
import pekko.util.HashCode
// for binary compatibility (used to be a case class)
object EventEnvelope extends AbstractFunction4[Offset, String, Long, Any, EventEnvelope] {
def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any, timestamp: Long): EventEnvelope =
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, None)
def apply(
offset: Offset,
persistenceId: String,
sequenceNr: Long,
event: Any,
timestamp: Long,
meta: Option[Any]): EventEnvelope =
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, meta)
@deprecated("for binary compatibility", "2.6.2")
override def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any): EventEnvelope =
new EventEnvelope(offset, persistenceId, sequenceNr, event)
def unapply(arg: EventEnvelope): Option[(Offset, String, Long, Any)] =
Some((arg.offset, arg.persistenceId, arg.sequenceNr, arg.event))
}
/**
* Event wrapper adding meta data for the events in the result stream of
* [[pekko.persistence.query.scaladsl.EventsByTagQuery]] query, or similar queries.
*
* The `timestamp` is the time the event was stored, in milliseconds since midnight, January 1, 1970 UTC
* (same as `System.currentTimeMillis`).
*/
final class EventEnvelope(
val offset: Offset,
val persistenceId: String,
val sequenceNr: Long,
val event: Any,
val timestamp: Long,
val eventMetadata: Option[Any])
extends Product4[Offset, String, Long, Any]
with Serializable {
@deprecated("for binary compatibility", "2.6.2")
def this(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any) =
this(offset, persistenceId, sequenceNr, event, 0L, None)
// bin compat 2.6.7
def this(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any, timestamp: Long) =
this(offset, persistenceId, sequenceNr, event, timestamp, None)
/**
* Java API
*/
def getEventMetaData(): Optional[Any] = {
import scala.compat.java8.OptionConverters._
eventMetadata.asJava
}
override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, offset)
result = HashCode.hash(result, persistenceId)
result = HashCode.hash(result, sequenceNr)
result = HashCode.hash(result, event)
result
}
override def equals(obj: Any): Boolean = obj match {
case other: EventEnvelope =>
offset == other.offset && persistenceId == other.persistenceId && sequenceNr == other.sequenceNr &&
event == other.event // timestamp && metadata not included in equals for backwards compatibility
case _ => false
}
override def toString: String =
s"EventEnvelope($offset,$persistenceId,$sequenceNr,$event,$timestamp,$eventMetadata)"
// for binary compatibility (used to be a case class)
def copy(
offset: Offset = this.offset,
persistenceId: String = this.persistenceId,
sequenceNr: Long = this.sequenceNr,
event: Any = this.event): EventEnvelope =
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, this.eventMetadata)
@InternalApi
private[pekko] def withMetadata(metadata: Any): EventEnvelope =
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, Some(metadata))
// Product4, for binary compatibility (used to be a case class)
override def productPrefix = "EventEnvelope"
override def _1: Offset = offset
override def _2: String = persistenceId
override def _3: Long = sequenceNr
override def _4: Any = event
override def canEqual(that: Any): Boolean = that.isInstanceOf[EventEnvelope]
}

View file

@ -0,0 +1,122 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query
import java.time.Instant
import java.util.UUID
import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.util.UUIDComparator
object Offset {
// factories to aid discoverability
def noOffset: Offset = NoOffset
def sequence(value: Long): Offset = Sequence(value)
def timeBasedUUID(uuid: UUID): Offset = TimeBasedUUID(uuid)
def timestamp(instant: Instant): TimestampOffset = TimestampOffset(instant, instant, Map.empty)
}
abstract class Offset
/**
* Corresponds to an ordered sequence number for the events. Note that the corresponding
* offset of each event is provided in the [[pekko.persistence.query.EventEnvelope]],
* which makes it possible to resume the stream at a later point from a given offset.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*/
final case class Sequence(value: Long) extends Offset with Ordered[Sequence] {
override def compare(that: Sequence): Int = value.compare(that.value)
}
/**
* Corresponds to an ordered unique identifier of the events. Note that the corresponding
* offset of each event is provided in the [[pekko.persistence.query.EventEnvelope]],
* which makes it possible to resume the stream at a later point from a given offset.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*/
final case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID] {
if (value == null || value.version != 1) {
throw new IllegalArgumentException("UUID " + value + " is not a time-based UUID")
}
override def compare(other: TimeBasedUUID): Int = UUIDComparator.comparator.compare(value, other.value)
}
object TimestampOffset {
val Zero: TimestampOffset = TimestampOffset(Instant.EPOCH, Instant.EPOCH, Map.empty)
def apply(timestamp: Instant, seen: Map[String, Long]): TimestampOffset =
TimestampOffset(timestamp, Instant.EPOCH, seen)
/**
* Try to convert the Offset to a TimestampOffset. Epoch timestamp is used for `NoOffset`.
*/
def toTimestampOffset(offset: Offset): TimestampOffset = {
offset match {
case t: TimestampOffset => t
case NoOffset => TimestampOffset.Zero
case null => throw new IllegalArgumentException("Offset must not be null")
case other =>
throw new IllegalArgumentException(
s"Supported offset types are TimestampOffset and NoOffset, " +
s"received ${other.getClass.getName}")
}
}
}
/**
* Timestamp based offset. Since there can be several events for the same timestamp it keeps
* track of what sequence nrs for every persistence id that have been seen at this specific timestamp.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*
* API May Change
*
* @param timestamp
* time when the event was stored, microsecond granularity database timestamp
* @param readTimestamp
* time when the event was read, microsecond granularity database timestamp
* @param seen
* List of sequence nrs for every persistence id seen at this timestamp
*/
@ApiMayChange
final case class TimestampOffset(timestamp: Instant, readTimestamp: Instant, seen: Map[String, Long]) extends Offset {
/** Java API */
def getSeen(): java.util.Map[String, java.lang.Long] = {
import org.apache.pekko.util.ccompat.JavaConverters._
seen.map { case (pid, seqNr) => pid -> java.lang.Long.valueOf(seqNr) }.asJava
}
override def hashCode(): Int = timestamp.hashCode()
override def equals(obj: Any): Boolean =
obj match {
case other: TimestampOffset => timestamp == other.timestamp && seen == other.seen
case _ => false
}
}
/**
* Used when retrieving all events.
*/
case object NoOffset extends Offset {
/**
* Java API:
*/
def getInstance: Offset = this
}

View file

@ -0,0 +1,75 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query
import scala.reflect.ClassTag
import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.pekko
import pekko.actor._
import pekko.annotation.InternalApi
import pekko.persistence.{ PersistencePlugin, PluginProvider }
import pekko.persistence.query.scaladsl.ReadJournal
import pekko.util.unused
/**
* Persistence extension for queries.
*/
object PersistenceQuery extends ExtensionId[PersistenceQuery] with ExtensionIdProvider {
override def get(system: ActorSystem): PersistenceQuery = super.get(system)
override def get(system: ClassicActorSystemProvider): PersistenceQuery = super.get(system)
def createExtension(system: ExtendedActorSystem): PersistenceQuery = new PersistenceQuery(system)
def lookup: PersistenceQuery.type = PersistenceQuery
@InternalApi
private[pekko] val pluginProvider: PluginProvider[ReadJournalProvider, ReadJournal, javadsl.ReadJournal] =
new PluginProvider[ReadJournalProvider, scaladsl.ReadJournal, javadsl.ReadJournal] {
override def scalaDsl(t: ReadJournalProvider): ReadJournal = t.scaladslReadJournal()
override def javaDsl(t: ReadJournalProvider): javadsl.ReadJournal = t.javadslReadJournal()
}
}
class PersistenceQuery(system: ExtendedActorSystem)
extends PersistencePlugin[scaladsl.ReadJournal, javadsl.ReadJournal, ReadJournalProvider](system)(
ClassTag(classOf[ReadJournalProvider]),
PersistenceQuery.pluginProvider)
with Extension {
/**
* Scala API: Returns the [[pekko.persistence.query.scaladsl.ReadJournal]] specified by the given
* read journal configuration entry.
*
* The provided readJournalPluginConfig will be used to configure the journal plugin instead of the actor system
* config.
*/
final def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T =
pluginFor(readJournalPluginId, readJournalPluginConfig).scaladslPlugin.asInstanceOf[T]
/**
* Scala API: Returns the [[pekko.persistence.query.scaladsl.ReadJournal]] specified by the given
* read journal configuration entry.
*/
final def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String): T =
readJournalFor(readJournalPluginId, ConfigFactory.empty)
/**
* Java API: Returns the [[pekko.persistence.query.javadsl.ReadJournal]] specified by the given
* read journal configuration entry.
*/
final def getReadJournalFor[T <: javadsl.ReadJournal](
@unused clazz: Class[T],
readJournalPluginId: String,
readJournalPluginConfig: Config): T =
pluginFor(readJournalPluginId, readJournalPluginConfig).javadslPlugin.asInstanceOf[T]
final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T =
getReadJournalFor[T](clazz, readJournalPluginId, ConfigFactory.empty())
}

View file

@ -0,0 +1,32 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query
/**
* A query plugin must implement a class that implements this trait.
* It provides the concrete implementations for the Java and Scala APIs.
*
* A read journal plugin must provide implementations for both
* `org.apache.pekko.persistence.query.scaladsl.ReadJournal` and `org.apache.pekko.persistence.query.javadsl.ReadJournal`.
* The plugin must implement both the `scaladsl` and the `javadsl` traits because the
* `org.apache.pekko.stream.scaladsl.Source` and `org.apache.pekko.stream.javadsl.Source` are different types
* and even though those types can easily be converted to each other it is most convenient
* for the end user to get access to the Java or Scala `Source` directly.
* One of the implementations can delegate to the other.
*/
trait ReadJournalProvider {
/**
* The `ReadJournal` implementation for the Scala API.
* This corresponds to the instance that is returned by [[PersistenceQuery#readJournalFor]].
*/
def scaladslReadJournal(): scaladsl.ReadJournal
/**
* The `ReadJournal` implementation for the Java API.
* This corresponds to the instance that is returned by [[PersistenceQuery#getReadJournalFor]].
*/
def javadslReadJournal(): javadsl.ReadJournal
}

View file

@ -0,0 +1,223 @@
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.internal
import java.io.NotSerializableException
import java.nio.charset.StandardCharsets.UTF_8
import java.time.Instant
import java.util.Base64
import java.util.UUID
import scala.util.control.NonFatal
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.event.Logging
import pekko.persistence.query.NoOffset
import pekko.persistence.query.Offset
import pekko.persistence.query.Sequence
import pekko.persistence.query.TimeBasedUUID
import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.internal.protobuf.QueryMessages
import pekko.persistence.query.typed.EventEnvelope
import pekko.remote.serialization.WrappedPayloadSupport.{ deserializePayload, payloadBuilder }
import pekko.serialization.BaseSerializer
import pekko.serialization.SerializationExtension
import pekko.serialization.SerializerWithStringManifest
import pekko.serialization.Serializers
/**
* INTERNAL API
*
* Serializer for [[EventEnvelope]] and [[Offset]].
*/
@InternalApi private[pekko] final class QuerySerializer(val system: pekko.actor.ExtendedActorSystem)
extends SerializerWithStringManifest
with BaseSerializer {
private val log = Logging(system, classOf[QuerySerializer])
private lazy val serialization = SerializationExtension(system)
private final val EventEnvelopeManifest = "a"
private final val SequenceOffsetManifest = "SEQ"
private final val TimeBasedUUIDOffsetManifest = "TBU"
private final val TimestampOffsetManifest = "TSO"
private final val NoOffsetManifest = "NO"
private val manifestSeparator = ':'
// persistenceId and timestamp must not contain this separator char
private val timestampOffsetSeparator = ';'
override def manifest(o: AnyRef): String = o match {
case _: EventEnvelope[_] => EventEnvelopeManifest
case offset: Offset => toStorageRepresentation(offset)._2
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
override def toBinary(o: AnyRef): Array[Byte] = o match {
case env: EventEnvelope[_] =>
val builder = QueryMessages.EventEnvelope.newBuilder()
val (offset, offsetManifest) = toStorageRepresentation(env.offset)
builder
.setPersistenceId(env.persistenceId)
.setEntityType(env.entityType)
.setSlice(env.slice)
.setSequenceNr(env.sequenceNr)
.setTimestamp(env.timestamp)
.setOffset(offset)
.setOffsetManifest(offsetManifest)
env.eventOption.foreach(event => builder.setEvent(payloadBuilder(event, serialization, log)))
env.eventMetadata.foreach(meta => builder.setMetadata(payloadBuilder(meta, serialization, log)))
builder.build().toByteArray()
case offset: Offset =>
toStorageRepresentation(offset)._1.getBytes(UTF_8)
case _ =>
throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]")
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case EventEnvelopeManifest =>
val env = QueryMessages.EventEnvelope.parseFrom(bytes)
val offset = fromStorageRepresentation(env.getOffset, env.getOffsetManifest)
val eventOption =
if (env.hasEvent) Option(deserializePayload(env.getEvent, serialization))
else None
val metaOption =
if (env.hasMetadata) Option(deserializePayload(env.getMetadata, serialization))
else None
new EventEnvelope(
offset,
env.getPersistenceId,
env.getSequenceNr,
eventOption,
env.getTimestamp,
metaOption,
env.getEntityType,
env.getSlice)
case _ =>
fromStorageRepresentation(new String(bytes, UTF_8), manifest)
}
/**
* Deserialize an offset from a stored string representation and manifest.
* The offset is converted from its string representation to its real type.
*/
private def fromStorageRepresentation(offsetStr: String, manifest: String): Offset = {
manifest match {
case TimestampOffsetManifest => timestampOffsetFromStorageRepresentation(offsetStr)
case SequenceOffsetManifest => Offset.sequence(offsetStr.toLong)
case TimeBasedUUIDOffsetManifest => Offset.timeBasedUUID(UUID.fromString(offsetStr))
case NoOffsetManifest => NoOffset
case _ =>
manifest.split(manifestSeparator) match {
case Array(serializerIdStr, serializerManifest) =>
val serializerId = serializerIdStr.toInt
val bytes = Base64.getDecoder.decode(offsetStr)
serialization.deserialize(bytes, serializerId, serializerManifest).get match {
case offset: Offset => offset
case other =>
throw new NotSerializableException(
s"Unimplemented deserialization of offset with serializerId [$serializerId] and manifest [$manifest] " +
s"in [${getClass.getName}]. [${other.getClass.getName}] is not an Offset.")
}
case _ =>
throw new NotSerializableException(
s"Unimplemented deserialization of offset with manifest [$manifest] " +
s"in [${getClass.getName}]. [$manifest] doesn't contain two parts.")
}
}
}
/**
* Convert the offset to a tuple (String, String) where the first element is
* the String representation of the offset and the second element is its manifest.
*/
private def toStorageRepresentation(offset: Offset): (String, String) = {
offset match {
case t: TimestampOffset => (timestampOffsetToStorageRepresentation(t), TimestampOffsetManifest)
case seq: Sequence => (seq.value.toString, SequenceOffsetManifest)
case tbu: TimeBasedUUID => (tbu.value.toString, TimeBasedUUIDOffsetManifest)
case NoOffset => ("", NoOffsetManifest)
case _ =>
val obj = offset.asInstanceOf[AnyRef]
val serializer = serialization.findSerializerFor(obj)
val serializerId = serializer.identifier
val serializerManifest = Serializers.manifestFor(serializer, obj)
val bytes = serializer.toBinary(obj)
val offsetStr = Base64.getEncoder.encodeToString(bytes)
if (serializerManifest.contains(manifestSeparator))
throw new IllegalArgumentException(
s"Serializer manifest [$serializerManifest] for " +
s"offset [${offset.getClass.getName}] must not contain [$manifestSeparator] character.")
(offsetStr, s"$serializerId$manifestSeparator$serializerManifest")
}
}
private def timestampOffsetFromStorageRepresentation(str: String): TimestampOffset = {
try {
str.split(timestampOffsetSeparator) match {
case Array(timestamp, readTimestamp, pid, seqNr) =>
// optimized for the normal case
TimestampOffset(Instant.parse(timestamp), Instant.parse(readTimestamp), Map(pid -> seqNr.toLong))
case Array(timestamp) =>
TimestampOffset(Instant.parse(timestamp), Map.empty)
case Array(timestamp, readTimestamp) =>
TimestampOffset(Instant.parse(timestamp), Instant.parse(readTimestamp), Map.empty)
case parts =>
val seen = parts.toList
.drop(2)
.grouped(2)
.map {
case pid :: seqNr :: Nil => pid -> seqNr.toLong
case _ =>
throw new IllegalArgumentException(
s"Invalid representation of Map(pid -> seqNr) [${parts.toList.drop(1).mkString(",")}]")
}
.toMap
TimestampOffset(Instant.parse(parts(0)), Instant.parse(parts(1)), seen)
}
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Unexpected serialized TimestampOffset format [$str].", e)
}
}
private def timestampOffsetToStorageRepresentation(offset: TimestampOffset): String = {
def checkSeparator(pid: String): Unit =
if (pid.contains(timestampOffsetSeparator))
throw new IllegalArgumentException(
s"persistenceId [$pid] in offset [$offset] " +
s"must not contain [$timestampOffsetSeparator] character")
val str = new java.lang.StringBuilder
str.append(offset.timestamp).append(timestampOffsetSeparator).append(offset.readTimestamp)
if (offset.seen.size == 1) {
// optimized for the normal case
val pid = offset.seen.head._1
checkSeparator(pid)
val seqNr = offset.seen.head._2
str.append(timestampOffsetSeparator).append(pid).append(timestampOffsetSeparator).append(seqNr)
} else if (offset.seen.nonEmpty) {
offset.seen.toList.sortBy(_._1).foreach {
case (pid, seqNr) =>
checkSeparator(pid)
str.append(timestampOffsetSeparator).append(pid).append(timestampOffsetSeparator).append(seqNr)
}
}
str.toString
}
}

View file

@ -0,0 +1,28 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.javadsl
import org.apache.pekko
import pekko.NotUsed
import pekko.persistence.query.EventEnvelope
import pekko.stream.javadsl.Source
/**
* A plugin may optionally support this query by implementing this interface.
*/
trait CurrentEventsByPersistenceIdQuery extends ReadJournal {
/**
* Same type of query as [[EventsByPersistenceIdQuery#eventsByPersistenceId]]
* but the event stream is completed immediately when it reaches the end of
* the "result set". Events that are stored after the query is completed are
* not included in the event stream.
*/
def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed]
}

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.javadsl
import org.apache.pekko
import pekko.NotUsed
import pekko.persistence.query.{ EventEnvelope, Offset }
import pekko.stream.javadsl.Source
/**
* A plugin may optionally support this query by implementing this interface.
*/
trait CurrentEventsByTagQuery extends ReadJournal {
/**
* Same type of query as [[EventsByTagQuery#eventsByTag]] but the event stream
* is completed immediately when it reaches the end of the "result set". Depending
* on journal implementation, this may mean all events up to when the query is
* started, or it may include events that are persisted while the query is still
* streaming results. For eventually consistent stores, it may only include all
* events up to some point before the query is started.
*/
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
}

View file

@ -0,0 +1,23 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.javadsl
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.javadsl.Source
/**
* A plugin may optionally support this query by implementing this interface.
*/
trait CurrentPersistenceIdsQuery extends ReadJournal {
/**
* Same type of query as [[PersistenceIdsQuery#persistenceIds]] but the stream
* is completed immediately when it reaches the end of the "result set". Persistent
* actors that are created after the query is completed are not included in the stream.
*/
def currentPersistenceIds(): Source[String, NotUsed]
}

View file

@ -0,0 +1,33 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.javadsl
import java.util.Optional
import org.apache.pekko
import pekko.NotUsed
import pekko.persistence.state.javadsl.DurableStateStore
import pekko.stream.javadsl.Source
/**
* A DurableStateStore may optionally support this query by implementing this trait.
*/
trait DurableStateStorePagedPersistenceIdsQuery[A] extends DurableStateStore[A] {
/**
* Get the current persistence ids.
*
* Not all plugins may support in database paging, and may simply use drop/take Akka streams operators
* to manipulate the result set according to the paging parameters.
*
* @param afterId The ID to start returning results from, or empty to return all ids. This should be an id returned
* from a previous invocation of this command. Callers should not assume that ids are returned in
* sorted order.
* @param limit The maximum results to return. Use Long.MAX_VALUE to return all results. Must be greater than zero.
* @return A source containing all the persistence ids, limited as specified.
*/
def currentPersistenceIds(afterId: Optional[String], limit: Long): Source[String, NotUsed]
}

View file

@ -0,0 +1,62 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.javadsl
import org.apache.pekko
import pekko.NotUsed
import pekko.persistence.state.javadsl.DurableStateStore
import pekko.persistence.query.DurableStateChange
import pekko.persistence.query.Offset
import pekko.stream.javadsl.Source
/**
* Query API for reading durable state objects.
*
* For Scala API see [[pekko.persistence.query.scaladsl.DurableStateStoreQuery]].
*/
trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
/**
* Get a source of the most recent changes made to objects with the given tag since the passed in offset.
*
* Note that this only returns the most recent change to each object, if an object has been updated multiple times
* since the offset, only the most recent of those changes will be part of the stream.
*
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[pekko.persistence.query.UpdatedDurableState]] or
* [[pekko.persistence.query.DeletedDurableState]].
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[pekko.persistence.query.NoOffset]] to get
* changes since the beginning of time, or an offset that has been previously returned by this query.
* Any other offsets are invalid.
* @return A source of change in state.
*/
def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
/**
* Get a source of the most recent changes made to objects of the given tag since the passed in offset.
*
* The returned source will never terminate, it effectively watches for changes to the objects and emits changes as
* they happen.
*
* Not all changes that occur are guaranteed to be emitted, this call only guarantees that eventually, the most
* recent change for each object since the offset will be emitted. In particular, multiple updates to a given object
* in quick succession are likely to be skipped, with only the last update resulting in a change from this
* source.
*
* The [[DurableStateChange]] elements can be [[pekko.persistence.query.UpdatedDurableState]] or
* [[pekko.persistence.query.DeletedDurableState]].
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[pekko.persistence.query.NoOffset]] to get
* changes since the beginning of time, or an offset that has been previously returned by this query.
* Any other offsets are invalid.
* @return A source of change in state.
*/
def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
}

View file

@ -0,0 +1,36 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.javadsl
import org.apache.pekko
import pekko.NotUsed
import pekko.persistence.query.EventEnvelope
import pekko.stream.javadsl.Source
/**
* A plugin may optionally support this query by implementing this interface.
*/
trait EventsByPersistenceIdQuery extends ReadJournal {
/**
* Query events for a specific `PersistentActor` identified by `persistenceId`.
*
* You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr`
* or use `0L` and `Long.MAX_VALUE` respectively to retrieve all events. The query will
* return all the events inclusive of the `fromSequenceNr` and `toSequenceNr` values.
*
* The returned event stream should be ordered by sequence number.
*
* The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted.
* Corresponding query that is completed when it reaches the end of the currently
* stored events is provided by [[CurrentEventsByPersistenceIdQuery#currentEventsByPersistenceId]].
*/
def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed]
}

View file

@ -0,0 +1,49 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.javadsl
import org.apache.pekko
import pekko.NotUsed
import pekko.persistence.query.{ EventEnvelope, Offset }
import pekko.stream.javadsl.Source
/**
* A plugin may optionally support this query by implementing this interface.
*/
trait EventsByTagQuery extends ReadJournal {
/**
* Query events that have a specific tag. A tag can for example correspond to an
* aggregate root type (in DDD terminology).
*
* The consumer can keep track of its current position in the event stream by storing the
* `offset` and restart the query from a given `offset` after a crash/restart.
*
* The exact meaning of the `offset` depends on the journal and must be documented by the
* read journal plugin. It may be a sequential id number that uniquely identifies the
* position of each event within the event stream. Distributed data stores cannot easily
* support those semantics and they may use a weaker meaning. For example it may be a
* timestamp (taken when the event was created or stored). Timestamps are not unique and
* not strictly ordered, since clocks on different machines may not be synchronized.
*
* In strongly consistent stores, where the `offset` is unique and strictly ordered, the
* stream should start from the next event after the `offset`. Otherwise, the read journal
* should ensure that between an invocation that returned an event with the given
* `offset` and this invocation, no events are missed. Depending on the journal
* implementation, this may mean that this invocation will return events that were already
* returned by the previous invocation, including the event with the passed in `offset`.
*
* The returned event stream should be ordered by `offset` if possible, but this can also be
* difficult to fulfill for a distributed data store. The order must be documented by the
* read journal plugin.
*
* The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted.
* Corresponding query that is completed when it reaches the end of the currently
* stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]].
*/
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
}

View file

@ -0,0 +1,32 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.javadsl
import java.util.Optional
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.javadsl.Source
/**
* A ReadJournal may optionally support this query by implementing this trait.
*/
trait PagedPersistenceIdsQuery extends ReadJournal {
/**
* Get the current persistence ids.
*
* Not all plugins may support in database paging, and may simply use drop/take Akka streams operators
* to manipulate the result set according to the paging parameters.
*
* @param afterId The ID to start returning results from, or empty to return all ids. This should be an id returned
* from a previous invocation of this command. Callers should not assume that ids are returned in
* sorted order.
* @param limit The maximum results to return. Use Long.MAX_VALUE to return all results. Must be greater than zero.
* @return A source containing all the persistence ids, limited as specified.
*/
def currentPersistenceIds(afterId: Optional[String], limit: Long): Source[String, NotUsed]
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.javadsl
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.javadsl.Source
/**
* A plugin may optionally support this query by implementing this interface.
*/
trait PersistenceIdsQuery extends ReadJournal {
/**
* Query all `PersistentActor` identifiers, i.e. as defined by the
* `persistenceId` of the `PersistentActor`.
*
* The stream is not completed when it reaches the end of the currently used `persistenceIds`,
* but it continues to push new `persistenceIds` when new persistent actors are created.
* Corresponding query that is completed when it reaches the end of the currently
* currently used `persistenceIds` is provided by [[CurrentPersistenceIdsQuery#currentPersistenceIds]].
*/
def persistenceIds(): Source[String, NotUsed]
}

View file

@ -0,0 +1,29 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.javadsl
/**
* API for reading persistent events and information derived
* from stored persistent events.
*
* The purpose of the API is not to enforce compatibility between different
* journal implementations, because the technical capabilities may be very different.
* The interface is very open so that different journals may implement specific queries.
*
* There are a few pre-defined queries that a query implementation may implement,
* such as [[EventsByPersistenceIdQuery]], [[PersistenceIdsQuery]] and [[EventsByTagQuery]]
* Implementation of these queries are optional and query (journal) plugins may define
* their own specialized queries by implementing other methods.
*
* Usage:
* {{{
* SomeCoolReadJournal journal =
* PersistenceQuery.get(system).getReadJournalFor(SomeCoolReadJournal.class, queryPluginConfigPath);
* Source<EventEnvolope, Unit> events = journal.eventsByTag("mytag", 0L);
* }}}
*
* For Scala API see [[org.apache.pekko.persistence.query.scaladsl.ReadJournal]].
*/
trait ReadJournal

View file

@ -0,0 +1,73 @@
/*
* Copyright (C) 2019-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.journal.leveldb
import org.apache.pekko
import pekko.actor.ActorRef
import pekko.annotation.InternalApi
import pekko.persistence.Persistence
import pekko.persistence.journal.leveldb.LeveldbJournal
import pekko.stream.Attributes
import pekko.stream.Materializer
import pekko.stream.Outlet
import pekko.stream.SourceShape
import pekko.stream.stage.GraphStage
import pekko.stream.stage.GraphStageLogic
import pekko.stream.stage.OutHandler
import pekko.stream.stage.TimerGraphStageLogicWithLogging
/**
* INTERNAL API
*/
@InternalApi
final private[pekko] class AllPersistenceIdsStage(liveQuery: Boolean, writeJournalPluginId: String, mat: Materializer)
extends GraphStage[SourceShape[String]] {
val out: Outlet[String] = Outlet("AllPersistenceIds.out")
override def shape: SourceShape[String] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[String] {
override def doPush(out: Outlet[String], elem: String): Unit = super.push(out, elem)
setHandler(out, this)
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
var initialResponseReceived = false
override protected def logSource: Class[_] = classOf[AllPersistenceIdsStage]
override def preStart(): Unit = {
journal.tell(LeveldbJournal.SubscribeAllPersistenceIds, getStageActor(journalInteraction).ref)
}
private def journalInteraction(in: (ActorRef, Any)): Unit = {
val (_, msg) = in
msg match {
case LeveldbJournal.CurrentPersistenceIds(allPersistenceIds) =>
buffer(allPersistenceIds)
deliverBuf(out)
initialResponseReceived = true
if (!liveQuery && bufferEmpty)
completeStage()
case LeveldbJournal.PersistenceIdAdded(persistenceId) =>
if (liveQuery) {
buffer(persistenceId)
deliverBuf(out)
}
case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser
}
}
override def onPull(): Unit = {
deliverBuf(out)
if (initialResponseReceived && !liveQuery && bufferEmpty)
completeStage()
}
}
}

View file

@ -0,0 +1,37 @@
/*
* Copyright (C) 2019-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.journal.leveldb
import java.util
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.stream.Outlet
import pekko.stream.stage.GraphStageLogic
import pekko.util.ccompat.JavaConverters._
/**
* INTERNAL API
*/
@InternalApi
private[leveldb] abstract trait Buffer[T] { self: GraphStageLogic =>
def doPush(out: Outlet[T], elem: T): Unit
private val buf: java.util.LinkedList[T] = new util.LinkedList[T]()
def buffer(element: T): Unit = {
buf.add(element)
}
def buffer(all: Set[T]): Unit = {
buf.addAll(all.asJava)
}
def deliverBuf(out: Outlet[T]): Unit = {
if (!buf.isEmpty && isAvailable(out)) {
doPush(out, buf.remove())
}
}
def bufferSize: Int = buf.size
def bufferEmpty: Boolean = buf.isEmpty
}

View file

@ -0,0 +1,161 @@
/*
* Copyright (C) 2019-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.journal.leveldb
import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import pekko.actor.ActorRef
import pekko.annotation.InternalApi
import pekko.persistence.JournalProtocol.RecoverySuccess
import pekko.persistence.JournalProtocol.ReplayMessages
import pekko.persistence.JournalProtocol.ReplayMessagesFailure
import pekko.persistence.JournalProtocol.ReplayedMessage
import pekko.persistence.Persistence
import pekko.persistence.journal.leveldb.LeveldbJournal
import pekko.persistence.journal.leveldb.LeveldbJournal.EventAppended
import pekko.persistence.query.EventEnvelope
import pekko.persistence.query.Sequence
import pekko.persistence.query.journal.leveldb.EventsByPersistenceIdStage.Continue
import pekko.stream.Attributes
import pekko.stream.Materializer
import pekko.stream.Outlet
import pekko.stream.SourceShape
import pekko.stream.stage.GraphStage
import pekko.stream.stage.GraphStageLogic
import pekko.stream.stage.OutHandler
import pekko.stream.stage.TimerGraphStageLogicWithLogging
/**
* INTERNAL API
*/
@InternalApi
private[pekko] object EventsByPersistenceIdStage {
case object Continue
}
/**
* INTERNAL API
*/
@InternalApi
final private[pekko] class EventsByPersistenceIdStage(
persistenceId: String,
fromSequenceNr: Long,
initialToSequenceNr: Long,
maxBufSize: Int,
writeJournalPluginId: String,
refreshInterval: Option[FiniteDuration],
mat: Materializer)
extends GraphStage[SourceShape[EventEnvelope]] {
val out: Outlet[EventEnvelope] = Outlet("EventsByPersistenceIdSource")
override def shape: SourceShape[EventEnvelope] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] {
override def doPush(out: Outlet[EventEnvelope], elem: EventEnvelope): Unit = super.push(out, elem)
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
var stageActorRef: ActorRef = null
var replayInProgress = false
var outstandingReplay = false
var nextSequenceNr = fromSequenceNr
var toSequenceNr = initialToSequenceNr
override protected def logSource: Class[_] = classOf[EventsByPersistenceIdStage]
override def preStart(): Unit = {
stageActorRef = getStageActor(journalInteraction).ref
refreshInterval.foreach(fd => {
scheduleWithFixedDelay(Continue, fd, fd)
journal.tell(LeveldbJournal.SubscribePersistenceId(persistenceId), stageActorRef)
})
requestMore()
}
private def requestMore(): Unit = {
if (!replayInProgress) {
val limit = maxBufSize - bufferSize
if (limit > 0 && nextSequenceNr <= toSequenceNr) {
replayInProgress = true
outstandingReplay = false
val request = ReplayMessages(nextSequenceNr, toSequenceNr, limit, persistenceId, stageActorRef)
journal ! request
}
} else {
outstandingReplay = true
}
}
override protected def onTimer(timerKey: Any): Unit = {
requestMore()
deliverBuf(out)
maybeCompleteStage()
}
private def journalInteraction(in: (ActorRef, Any)): Unit = {
val (_, msg) = in
msg match {
case ReplayedMessage(pr) =>
buffer(
EventEnvelope(
offset = Sequence(pr.sequenceNr),
persistenceId = pr.persistenceId,
sequenceNr = pr.sequenceNr,
event = pr.payload,
timestamp = pr.timestamp))
nextSequenceNr = pr.sequenceNr + 1
deliverBuf(out)
case RecoverySuccess(highestSeqNr) =>
replayInProgress = false
deliverBuf(out)
if (highestSeqNr < toSequenceNr && isCurrentQuery()) {
toSequenceNr = highestSeqNr
}
log.debug(
"Replay complete. From sequenceNr {} currentSequenceNr {} toSequenceNr {} buffer size {}",
fromSequenceNr,
nextSequenceNr,
toSequenceNr,
bufferSize)
if (bufferEmpty && (nextSequenceNr > toSequenceNr || (nextSequenceNr == fromSequenceNr && isCurrentQuery()))) {
completeStage()
} else if (nextSequenceNr < toSequenceNr) {
// need further requests to the journal
if (bufferSize < maxBufSize && (isCurrentQuery() || outstandingReplay)) {
requestMore()
}
}
case ReplayMessagesFailure(cause) =>
failStage(cause)
case EventAppended(_) =>
requestMore()
case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser
}
}
private def isCurrentQuery(): Boolean = refreshInterval.isEmpty
private def maybeCompleteStage(): Unit = {
if (bufferEmpty && nextSequenceNr > toSequenceNr) {
completeStage()
}
}
override def onPull(): Unit = {
requestMore()
deliverBuf(out)
maybeCompleteStage()
}
setHandler(out, this)
}
}

View file

@ -0,0 +1,159 @@
/*
* Copyright (C) 2019-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.journal.leveldb
import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import pekko.actor.ActorRef
import pekko.annotation.InternalApi
import pekko.persistence.JournalProtocol.RecoverySuccess
import pekko.persistence.JournalProtocol.ReplayMessagesFailure
import pekko.persistence.Persistence
import pekko.persistence.journal.leveldb.LeveldbJournal
import pekko.persistence.journal.leveldb.LeveldbJournal.ReplayTaggedMessages
import pekko.persistence.journal.leveldb.LeveldbJournal.ReplayedTaggedMessage
import pekko.persistence.journal.leveldb.LeveldbJournal.TaggedEventAppended
import pekko.persistence.query.EventEnvelope
import pekko.persistence.query.Sequence
import pekko.persistence.query.journal.leveldb.EventsByTagStage.Continue
import pekko.stream.Attributes
import pekko.stream.Materializer
import pekko.stream.Outlet
import pekko.stream.SourceShape
import pekko.stream.stage.GraphStage
import pekko.stream.stage.GraphStageLogic
import pekko.stream.stage.OutHandler
import pekko.stream.stage.TimerGraphStageLogicWithLogging
/**
* INTERNAL API
*/
@InternalApi
private[pekko] object EventsByTagStage {
case object Continue
}
/**
* INTERNAL API
*/
final private[leveldb] class EventsByTagStage(
tag: String,
fromOffset: Long,
maxBufSize: Int,
initialTooOffset: Long,
writeJournalPluginId: String,
refreshInterval: Option[FiniteDuration],
mat: Materializer)
extends GraphStage[SourceShape[EventEnvelope]] {
val out: Outlet[EventEnvelope] = Outlet("EventsByTagSource")
override def shape: SourceShape[EventEnvelope] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogicWithLogging(shape) with OutHandler with Buffer[EventEnvelope] {
override def doPush(out: Outlet[EventEnvelope], elem: EventEnvelope): Unit = super.push(out, elem)
val journal: ActorRef = Persistence(mat.system).journalFor(writeJournalPluginId)
var currOffset: Long = fromOffset
var toOffset: Long = initialTooOffset
var stageActorRef: ActorRef = null
var replayInProgress = false
var outstandingReplay = false
override protected def logSource: Class[_] = classOf[EventsByTagStage]
override def preStart(): Unit = {
stageActorRef = getStageActor(journalInteraction).ref
refreshInterval.foreach(fd => {
scheduleWithFixedDelay(Continue, fd, fd)
journal.tell(LeveldbJournal.SubscribeTag(tag), stageActorRef)
})
requestMore()
}
override protected def onTimer(timerKey: Any): Unit = {
requestMore()
deliverBuf(out)
}
private def requestMore(): Unit = {
if (!replayInProgress) {
val limit = maxBufSize - bufferSize
if (limit > 0) {
replayInProgress = true
outstandingReplay = false
val request = ReplayTaggedMessages(currOffset, toOffset, limit, tag, stageActorRef)
journal ! request
}
} else {
outstandingReplay = true
}
}
private def journalInteraction(in: (ActorRef, Any)): Unit = {
val (_, msg) = in
msg match {
case ReplayedTaggedMessage(p, _, offset) =>
buffer(
EventEnvelope(
offset = Sequence(offset),
persistenceId = p.persistenceId,
sequenceNr = p.sequenceNr,
event = p.payload,
timestamp = p.timestamp))
currOffset = offset
deliverBuf(out)
case RecoverySuccess(highestSeqNr) =>
replayInProgress = false
deliverBuf(out)
log.debug(
"Replay complete. Current offset {} toOffset {} buffer size {} highestSeqNr {}",
currOffset,
toOffset,
bufferSize,
highestSeqNr)
// Set toOffset to know when to end the query for current queries
// live queries go on forever
if (highestSeqNr < toOffset && isCurrentQuery()) {
toOffset = highestSeqNr
}
if (currOffset < toOffset) {
// need further requests to the journal
if (bufferSize < maxBufSize && (isCurrentQuery() || outstandingReplay)) {
requestMore()
}
} else {
checkComplete()
}
case ReplayMessagesFailure(cause) =>
failStage(cause)
case TaggedEventAppended(_) =>
requestMore()
case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser
}
}
private def isCurrentQuery(): Boolean = refreshInterval.isEmpty
private def checkComplete(): Unit = {
if (bufferEmpty && currOffset >= toOffset) {
completeStage()
}
}
override def onPull(): Unit = {
requestMore()
deliverBuf(out)
checkComplete()
}
setHandler(out, this)
}
}

View file

@ -0,0 +1,25 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.journal.leveldb
import com.typesafe.config.Config
import org.apache.pekko
import pekko.actor.ExtendedActorSystem
import pekko.persistence.query.ReadJournalProvider
@deprecated("Use another journal/query implementation", "2.6.15")
class LeveldbReadJournalProvider(system: ExtendedActorSystem, config: Config) extends ReadJournalProvider {
val readJournal: scaladsl.LeveldbReadJournal = new scaladsl.LeveldbReadJournal(system, config)
override def scaladslReadJournal(): pekko.persistence.query.scaladsl.ReadJournal =
readJournal
val javaReadJournal = new javadsl.LeveldbReadJournal(readJournal)
override def javadslReadJournal(): pekko.persistence.query.javadsl.ReadJournal =
javaReadJournal
}

View file

@ -0,0 +1,172 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.journal.leveldb.javadsl
import org.apache.pekko
import pekko.NotUsed
import pekko.persistence.query.{ EventEnvelope, Offset }
import pekko.persistence.query.javadsl._
import pekko.stream.javadsl.Source
/**
* Java API: [[pekko.persistence.query.javadsl.ReadJournal]] implementation for LevelDB.
*
* It is retrieved with:
* {{{
* LeveldbReadJournal queries =
* PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier());
* }}}
*
* Corresponding Scala API is in [[pekko.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal]].
*
* Configuration settings can be defined in the configuration section with the
* absolute path corresponding to the identifier, which is `"pekko.persistence.query.journal.leveldb"`
* for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`.
*/
@deprecated("Use another journal implementation", "2.6.15")
class LeveldbReadJournal(scaladslReadJournal: pekko.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal)
extends ReadJournal
with PersistenceIdsQuery
with CurrentPersistenceIdsQuery
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery
with EventsByTagQuery
with CurrentEventsByTagQuery {
/**
* `persistenceIds` is used for retrieving all `persistenceIds` of all
* persistent actors.
*
* The returned event stream is unordered and you can expect different order for multiple
* executions of the query.
*
* The stream is not completed when it reaches the end of the currently used `persistenceIds`,
* but it continues to push new `persistenceIds` when new persistent actors are created.
* Corresponding query that is completed when it reaches the end of the currently
* currently used `persistenceIds` is provided by [[#currentPersistenceIds]].
*
* The LevelDB write journal is notifying the query side as soon as new `persistenceIds` are
* created and there is no periodic polling or batching involved in this query.
*
* The stream is completed with failure if there is a failure in executing the query in the
* backend journal.
*/
override def persistenceIds(): Source[String, NotUsed] =
scaladslReadJournal.persistenceIds().asJava
/**
* Same type of query as [[#persistenceIds]] but the stream
* is completed immediately when it reaches the end of the "result set". Persistent
* actors that are created after the query is completed are not included in the stream.
*/
override def currentPersistenceIds(): Source[String, NotUsed] =
scaladslReadJournal.currentPersistenceIds().asJava
/**
* `eventsByPersistenceId` is used for retrieving events for a specific
* `PersistentActor` identified by `persistenceId`.
*
* You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr`
* or use `0L` and `Long.MaxValue` respectively to retrieve all events. Note that
* the corresponding sequence number of each event is provided in the
* [[pekko.persistence.query.EventEnvelope]], which makes it possible to resume the
* stream at a later point from a given sequence number.
*
* The returned event stream is ordered by sequence number, i.e. the same order as the
* `PersistentActor` persisted the events. The same prefix of stream elements (in same order)
* are returned for multiple executions of the query, except for when events have been deleted.
*
* The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted.
* Corresponding query that is completed when it reaches the end of the currently
* stored events is provided by [[#currentEventsByPersistenceId]].
*
* The LevelDB write journal is notifying the query side as soon as events are persisted, but for
* efficiency reasons the query side retrieves the events in batches that sometimes can
* be delayed up to the configured `refresh-interval`.
*
* The stream is completed with failure if there is a failure in executing the query in the
* backend journal.
*/
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
scaladslReadJournal.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asJava
/**
* Same type of query as [[#eventsByPersistenceId]] but the event stream
* is completed immediately when it reaches the end of the "result set". Events that are
* stored after the query is completed are not included in the event stream.
*/
override def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
scaladslReadJournal.currentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asJava
/**
* `eventsByTag` is used for retrieving events that were marked with
* a given tag, e.g. all events of an Aggregate Root type.
*
* To tag events you create an [[pekko.persistence.journal.EventAdapter]] that wraps the events
* in a [[pekko.persistence.journal.Tagged]] with the given `tags`.
*
* You can retrieve a subset of all events by specifying `offset`, or use `0L` to retrieve all
* events with a given tag. The `offset` corresponds to an ordered sequence number for
* the specific tag. Note that the corresponding offset of each event is provided in the
* [[pekko.persistence.query.EventEnvelope]], which makes it possible to resume the
* stream at a later point from a given offset.
*
* In addition to the `offset` the `EventEnvelope` also provides `persistenceId` and `sequenceNr`
* for each event. The `sequenceNr` is the sequence number for the persistent actor with the
* `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique
* identifier for the event.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*
* The returned event stream is ordered by the offset (tag sequence number), which corresponds
* to the same order as the write journal stored the events. The same stream elements (in same order)
* are returned for multiple executions of the query. Deleted events are not deleted from the
* tagged event stream.
*
* The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted.
* Corresponding query that is completed when it reaches the end of the currently
* stored events is provided by [[#currentEventsByTag]].
*
* The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for
* efficiency reasons the query side retrieves the events in batches that sometimes can
* be delayed up to the configured `refresh-interval`.
*
* The stream is completed with failure if there is a failure in executing the query in the
* backend journal.
*/
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
scaladslReadJournal.eventsByTag(tag, offset).asJava
/**
* Same type of query as [[#eventsByTag]] but the event stream
* is completed immediately when it reaches the end of the "result set". Events that are
* stored after the query is completed are not included in the event stream.
*/
override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
scaladslReadJournal.currentEventsByTag(tag, offset).asJava
}
object LeveldbReadJournal {
/**
* The default identifier for [[LeveldbReadJournal]] to be used with
* [[pekko.persistence.query.PersistenceQuery#getReadJournalFor]].
*
* The value is `"pekko.persistence.query.journal.leveldb"` and corresponds
* to the absolute path to the read journal configuration entry.
*/
final val Identifier = pekko.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal.Identifier
}

View file

@ -0,0 +1,282 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.journal.leveldb.scaladsl
import java.net.URLEncoder
import scala.concurrent.duration._
import com.typesafe.config.Config
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ExtendedActorSystem
import pekko.event.Logging
import pekko.persistence.query.EventEnvelope
import pekko.persistence.query.NoOffset
import pekko.persistence.query.Offset
import pekko.persistence.query.Sequence
import pekko.persistence.query.journal.leveldb.AllPersistenceIdsStage
import pekko.persistence.query.journal.leveldb.EventsByPersistenceIdStage
import pekko.persistence.query.journal.leveldb.EventsByTagStage
import pekko.persistence.query.scaladsl._
import pekko.persistence.query.scaladsl.ReadJournal
import pekko.stream.scaladsl.Source
import pekko.util.ByteString
/**
* Scala API [[pekko.persistence.query.scaladsl.ReadJournal]] implementation for LevelDB.
*
* It is retrieved with:
* {{{
* val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
* }}}
*
* Corresponding Java API is in [[pekko.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal]].
*
* Configuration settings can be defined in the configuration section with the
* absolute path corresponding to the identifier, which is `"pekko.persistence.query.journal.leveldb"`
* for the default [[LeveldbReadJournal#Identifier]]. See `reference.conf`.
*/
@deprecated("Use another journal implementation", "2.6.15")
class LeveldbReadJournal(system: ExtendedActorSystem, config: Config)
extends ReadJournal
with PersistenceIdsQuery
with CurrentPersistenceIdsQuery
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery
with EventsByTagQuery
with CurrentEventsByTagQuery {
private val refreshInterval = Some(config.getDuration("refresh-interval", MILLISECONDS).millis)
private val writeJournalPluginId: String = config.getString("write-plugin")
private val maxBufSize: Int = config.getInt("max-buffer-size")
private val resolvedWriteJournalPluginId =
if (writeJournalPluginId.isEmpty)
system.settings.config.getString("pekko.persistence.journal.plugin")
else
writeJournalPluginId
require(
resolvedWriteJournalPluginId.nonEmpty && system.settings.config
.getConfig(resolvedWriteJournalPluginId)
.getString("class") == "org.apache.pekko.persistence.journal.leveldb.LeveldbJournal",
s"Leveldb read journal can only work with a Leveldb write journal. Current plugin [$resolvedWriteJournalPluginId] is not a LeveldbJournal")
/**
* `persistenceIds` is used for retrieving all `persistenceIds` of all
* persistent actors.
*
* The returned event stream is unordered and you can expect different order for multiple
* executions of the query.
*
* The stream is not completed when it reaches the end of the currently used `persistenceIds`,
* but it continues to push new `persistenceIds` when new persistent actors are created.
* Corresponding query that is completed when it reaches the end of the currently
* currently used `persistenceIds` is provided by [[#currentPersistenceIds]].
*
* The LevelDB write journal is notifying the query side as soon as new `persistenceIds` are
* created and there is no periodic polling or batching involved in this query.
*
* The stream is completed with failure if there is a failure in executing the query in the
* backend journal.
*/
override def persistenceIds(): Source[String, NotUsed] =
// no polling for this query, the write journal will push all changes, i.e. no refreshInterval
Source
.fromMaterializer { (mat, _) =>
Source
.fromGraph(new AllPersistenceIdsStage(liveQuery = true, writeJournalPluginId, mat))
.named("allPersistenceIds")
}
.mapMaterializedValue(_ => NotUsed)
/**
* Same type of query as [[#persistenceIds]] but the stream
* is completed immediately when it reaches the end of the "result set". Persistent
* actors that are created after the query is completed are not included in the stream.
*/
override def currentPersistenceIds(): Source[String, NotUsed] =
Source
.fromMaterializer { (mat, _) =>
Source
.fromGraph(new AllPersistenceIdsStage(liveQuery = false, writeJournalPluginId, mat))
.named("allPersistenceIds")
}
.mapMaterializedValue(_ => NotUsed)
/**
* `eventsByPersistenceId` is used for retrieving events for a specific
* `PersistentActor` identified by `persistenceId`.
*
* You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr`
* or use `0L` and `Long.MaxValue` respectively to retrieve all events. Note that
* the corresponding sequence number of each event is provided in the
* [[pekko.persistence.query.EventEnvelope]], which makes it possible to resume the
* stream at a later point from a given sequence number.
*
* The returned event stream is ordered by sequence number, i.e. the same order as the
* `PersistentActor` persisted the events. The same prefix of stream elements (in same order)
* are returned for multiple executions of the query, except for when events have been deleted.
*
* The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted.
* Corresponding query that is completed when it reaches the end of the currently
* stored events is provided by [[#currentEventsByPersistenceId]].
*
* The LevelDB write journal is notifying the query side as soon as events are persisted, but for
* efficiency reasons the query side retrieves the events in batches that sometimes can
* be delayed up to the configured `refresh-interval`.
*
* The stream is completed with failure if there is a failure in executing the query in the
* backend journal.
*/
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long = 0L,
toSequenceNr: Long = Long.MaxValue): Source[EventEnvelope, NotUsed] = {
Source
.fromMaterializer { (mat, _) =>
Source
.fromGraph(
new EventsByPersistenceIdStage(
persistenceId,
fromSequenceNr,
toSequenceNr,
maxBufSize,
writeJournalPluginId,
refreshInterval,
mat))
.named("eventsByPersistenceId-" + persistenceId)
}
.mapMaterializedValue(_ => NotUsed)
}
/**
* Same type of query as [[#eventsByPersistenceId]] but the event stream
* is completed immediately when it reaches the end of the "result set". Events that are
* stored after the query is completed are not included in the event stream.
*/
override def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long = 0L,
toSequenceNr: Long = Long.MaxValue): Source[EventEnvelope, NotUsed] = {
Source
.fromMaterializer { (mat, _) =>
Source
.fromGraph(
new EventsByPersistenceIdStage(
persistenceId,
fromSequenceNr,
toSequenceNr,
maxBufSize,
writeJournalPluginId,
None,
mat))
.named("currentEventsByPersistenceId-" + persistenceId)
}
.mapMaterializedValue(_ => NotUsed)
}
/**
* `eventsByTag` is used for retrieving events that were marked with
* a given tag, e.g. all events of an Aggregate Root type.
*
* To tag events you create an [[pekko.persistence.journal.EventAdapter]] that wraps the events
* in a [[pekko.persistence.journal.Tagged]] with the given `tags`.
*
* You can use `NoOffset` to retrieve all events with a given tag or retrieve a subset of all
* events by specifying a `Sequence` `offset`. The `offset` corresponds to an ordered sequence number for
* the specific tag. Note that the corresponding offset of each event is provided in the
* [[pekko.persistence.query.EventEnvelope]], which makes it possible to resume the
* stream at a later point from a given offset.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*
* In addition to the `offset` the `EventEnvelope` also provides `persistenceId` and `sequenceNr`
* for each event. The `sequenceNr` is the sequence number for the persistent actor with the
* `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique
* identifier for the event.
*
* The returned event stream is ordered by the offset (tag sequence number), which corresponds
* to the same order as the write journal stored the events. The same stream elements (in same order)
* are returned for multiple executions of the query. Deleted events are not deleted from the
* tagged event stream.
*
* The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted.
* Corresponding query that is completed when it reaches the end of the currently
* stored events is provided by [[#currentEventsByTag]].
*
* The LevelDB write journal is notifying the query side as soon as tagged events are persisted, but for
* efficiency reasons the query side retrieves the events in batches that sometimes can
* be delayed up to the configured `refresh-interval`.
*
* The stream is completed with failure if there is a failure in executing the query in the
* backend journal.
*/
override def eventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] =
Source
.fromMaterializer { (mat, _) =>
offset match {
case seq: Sequence =>
Source
.fromGraph(
new EventsByTagStage(
tag,
seq.value,
maxBufSize,
Long.MaxValue,
writeJournalPluginId,
refreshInterval,
mat))
.named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
case NoOffset => eventsByTag(tag, Sequence(0L)) // recursive
case _ =>
throw new IllegalArgumentException(
"LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets")
}
}
.mapMaterializedValue(_ => NotUsed)
/**
* Same type of query as [[#eventsByTag]] but the event stream
* is completed immediately when it reaches the end of the "result set". Events that are
* stored after the query is completed are not included in the event stream.
*/
override def currentEventsByTag(tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] =
Source
.fromMaterializer { (mat, _) =>
offset match {
case seq: Sequence =>
Source
.fromGraph(
new EventsByTagStage(tag, seq.value, maxBufSize, Long.MaxValue, writeJournalPluginId, None, mat))
.named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
case NoOffset => currentEventsByTag(tag, Sequence(0L))
case _ =>
throw new IllegalArgumentException(
"LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets")
}
}
.mapMaterializedValue(_ => NotUsed)
}
object LeveldbReadJournal {
/**
* The default identifier for [[LeveldbReadJournal]] to be used with
* [[pekko.persistence.query.PersistenceQuery#readJournalFor]].
*
* The value is `"pekko.persistence.query.journal.leveldb"` and corresponds
* to the absolute path to the read journal configuration entry.
*/
final val Identifier = "pekko.persistence.query.journal.leveldb"
}

View file

@ -0,0 +1,28 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.scaladsl
import org.apache.pekko
import pekko.NotUsed
import pekko.persistence.query.EventEnvelope
import pekko.stream.scaladsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait CurrentEventsByPersistenceIdQuery extends ReadJournal {
/**
* Same type of query as [[EventsByPersistenceIdQuery#eventsByPersistenceId]]
* but the event stream is completed immediately when it reaches the end of
* the "result set". Events that are stored after the query is completed are
* not included in the event stream.
*/
def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed]
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.scaladsl
import org.apache.pekko
import pekko.NotUsed
import pekko.persistence.query.{ EventEnvelope, Offset }
import pekko.stream.scaladsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait CurrentEventsByTagQuery extends ReadJournal {
/**
* Same type of query as [[EventsByTagQuery#eventsByTag]] but the event stream
* is completed immediately when it reaches the end of the "result set". Depending
* on journal implementation, this may mean all events up to when the query is
* started, or it may include events that are persisted while the query is still
* streaming results. For eventually consistent stores, it may only include all
* events up to some point before the query is started.
*/
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
}

View file

@ -0,0 +1,23 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.scaladsl
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait CurrentPersistenceIdsQuery extends ReadJournal {
/**
* Same type of query as [[PersistenceIdsQuery#persistenceIds]] but the stream
* is completed immediately when it reaches the end of the "result set". Persistent
* actors that are created after the query is completed are not included in the stream.
*/
def currentPersistenceIds(): Source[String, NotUsed]
}

View file

@ -0,0 +1,30 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.scaladsl
import org.apache.pekko
import pekko.NotUsed
import pekko.persistence.state.scaladsl.DurableStateStore
import pekko.stream.scaladsl.Source
/**
* A DurableStateStore may optionally support this query by implementing this trait.
*/
trait DurableStateStorePagedPersistenceIdsQuery[A] extends DurableStateStore[A] {
/**
* Get the current persistence ids.
*
* Not all plugins may support in database paging, and may simply use drop/take Akka streams operators
* to manipulate the result set according to the paging parameters.
*
* @param afterId The ID to start returning results from, or [[None]] to return all ids. This should be an id
* returned from a previous invocation of this command. Callers should not assume that ids are
* returned in sorted order.
* @param limit The maximum results to return. Use Long.MaxValue to return all results. Must be greater than zero.
* @return A source containing all the persistence ids, limited as specified.
*/
def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]
}

View file

@ -0,0 +1,62 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.scaladsl
import org.apache.pekko
import pekko.NotUsed
import pekko.persistence.query.DurableStateChange
import pekko.persistence.state.scaladsl.DurableStateStore
import pekko.persistence.query.Offset
import pekko.stream.scaladsl.Source
/**
* Query API for reading durable state objects.
*
* For Java API see [[pekko.persistence.query.javadsl.DurableStateStoreQuery]].
*/
trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
/**
* Get a source of the most recent changes made to objects with the given tag since the passed in offset.
*
* Note that this only returns the most recent change to each object, if an object has been updated multiple times
* since the offset, only the most recent of those changes will be part of the stream.
*
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[pekko.persistence.query.UpdatedDurableState]] or
* [[pekko.persistence.query.DeletedDurableState]].
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[pekko.persistence.query.NoOffset]] to get
* changes since the beginning of time, or an offset that has been previously returned by this query.
* Any other offsets are invalid.
* @return A source of change in state.
*/
def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
/**
* Get a source of the most recent changes made to objects of the given tag since the passed in offset.
*
* The returned source will never terminate, it effectively watches for changes to the objects and emits changes as
* they happen.
*
* Not all changes that occur are guaranteed to be emitted, this call only guarantees that eventually, the most
* recent change for each object since the offset will be emitted. In particular, multiple updates to a given object
* in quick succession are likely to be skipped, with only the last update resulting in a change from this
* source.
*
* The [[DurableStateChange]] elements can be [[pekko.persistence.query.UpdatedDurableState]] or
* [[pekko.persistence.query.DeletedDurableState]].
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[pekko.persistence.query.NoOffset]] to get
* changes since the beginning of time, or an offset that has been previously returned by this query.
* Any other offsets are invalid.
* @return A source of change in state.
*/
def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
}

View file

@ -0,0 +1,36 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.scaladsl
import org.apache.pekko
import pekko.NotUsed
import pekko.persistence.query.EventEnvelope
import pekko.stream.scaladsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait EventsByPersistenceIdQuery extends ReadJournal {
/**
* Query events for a specific `PersistentActor` identified by `persistenceId`.
*
* You can retrieve a subset of all events by specifying `fromSequenceNr` and `toSequenceNr`
* or use `0L` and `Long.MaxValue` respectively to retrieve all events. The query will
* return all the events inclusive of the `fromSequenceNr` and `toSequenceNr` values.
*
* The returned event stream should be ordered by sequence number.
*
* The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted.
* Corresponding query that is completed when it reaches the end of the currently
* stored events is provided by [[CurrentEventsByPersistenceIdQuery#currentEventsByPersistenceId]].
*/
def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed]
}

View file

@ -0,0 +1,49 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.scaladsl
import org.apache.pekko
import pekko.NotUsed
import pekko.persistence.query.{ EventEnvelope, Offset }
import pekko.stream.scaladsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait EventsByTagQuery extends ReadJournal {
/**
* Query events that have a specific tag. A tag can for example correspond to an
* aggregate root type (in DDD terminology).
*
* The consumer can keep track of its current position in the event stream by storing the
* `offset` and restart the query from a given `offset` after a crash/restart.
*
* The exact meaning of the `offset` depends on the journal and must be documented by the
* read journal plugin. It may be a sequential id number that uniquely identifies the
* position of each event within the event stream. Distributed data stores cannot easily
* support those semantics and they may use a weaker meaning. For example it may be a
* timestamp (taken when the event was created or stored). Timestamps are not unique and
* not strictly ordered, since clocks on different machines may not be synchronized.
*
* In strongly consistent stores, where the `offset` is unique and strictly ordered, the
* stream should start from the next event after the `offset`. Otherwise, the read journal
* should ensure that between an invocation that returned an event with the given
* `offset`, and this invocation, no events are missed. Depending on the journal
* implementation, this may mean that this invocation will return events that were already
* returned by the previous invocation, including the event with the passed in `offset`.
*
* The returned event stream should be ordered by `offset` if possible, but this can also be
* difficult to fulfill for a distributed data store. The order must be documented by the
* read journal plugin.
*
* The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted.
* Corresponding query that is completed when it reaches the end of the currently
* stored events is provided by [[CurrentEventsByTagQuery#currentEventsByTag]].
*/
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
}

View file

@ -0,0 +1,29 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.scaladsl
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Source
/**
* A plugin ReadJournal may optionally support this query by implementing this trait.
*/
trait PagedPersistenceIdsQuery extends ReadJournal {
/**
* Get the current persistence ids.
*
* Not all plugins may support in database paging, and may simply use drop/take Akka streams operators
* to manipulate the result set according to the paging parameters.
*
* @param afterId The ID to start returning results from, or [[None]] to return all ids. This should be an id
* returned from a previous invocation of this command. Callers should not assume that ids are
* returned in sorted order.
* @param limit The maximum results to return. Use Long.MaxValue to return all results. Must be greater than zero.
* @return A source containing all the persistence ids, limited as specified.
*/
def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.scaladsl
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*/
trait PersistenceIdsQuery extends ReadJournal {
/**
* Query all `PersistentActor` identifiers, i.e. as defined by the
* `persistenceId` of the `PersistentActor`.
*
* The stream is not completed when it reaches the end of the currently used `persistenceIds`,
* but it continues to push new `persistenceIds` when new persistent actors are created.
* Corresponding query that is completed when it reaches the end of the currently
* currently used `persistenceIds` is provided by [[CurrentPersistenceIdsQuery#currentPersistenceIds]].
*/
def persistenceIds(): Source[String, NotUsed]
}

View file

@ -0,0 +1,28 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.scaladsl
/**
* API for reading persistent events and information derived
* from stored persistent events.
*
* The purpose of the API is not to enforce compatibility between different
* journal implementations, because the technical capabilities may be very different.
* The interface is very open so that different journals may implement specific queries.
*
* There are a few pre-defined queries that a query implementation may implement,
* such as [[EventsByPersistenceIdQuery]], [[PersistenceIdsQuery]] and [[EventsByTagQuery]]
* Implementation of these queries are optional and query (journal) plugins may define
* their own specialized queries by implementing other methods.
*
* Usage:
* {{{
* val journal = PersistenceQuery(system).readJournalFor[SomeCoolReadJournal](queryPluginConfigPath)
* val events = journal.query(EventsByTag("mytag", 0L))
* }}}
*
* For Java API see [[org.apache.pekko.persistence.query.javadsl.ReadJournal]].
*/
trait ReadJournal

View file

@ -0,0 +1,117 @@
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.typed
import java.util.Optional
import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.persistence.query.Offset
import pekko.util.HashCode
object EventEnvelope {
def apply[Event](
offset: Offset,
persistenceId: String,
sequenceNr: Long,
event: Event,
timestamp: Long,
entityType: String,
slice: Int): EventEnvelope[Event] =
new EventEnvelope(offset, persistenceId, sequenceNr, Option(event), timestamp, None, entityType, slice)
def create[Event](
offset: Offset,
persistenceId: String,
sequenceNr: Long,
event: Event,
timestamp: Long,
entityType: String,
slice: Int): EventEnvelope[Event] =
apply(offset, persistenceId, sequenceNr, event, timestamp, entityType, slice)
def unapply[Event](arg: EventEnvelope[Event]): Option[(Offset, String, Long, Option[Event], Long)] =
Some((arg.offset, arg.persistenceId, arg.sequenceNr, arg.eventOption, arg.timestamp))
}
/**
* Event wrapper adding meta data for the events in the result stream of
* [[pekko.persistence.query.typed.scaladsl.EventsBySliceQuery]] query, or similar queries.
*
* If the `event` is not defined it has not been loaded yet. It can be loaded with
* [[pekko.persistence.query.typed.scaladsl.LoadEventQuery]].
*
* The `timestamp` is the time the event was stored, in milliseconds since midnight, January 1, 1970 UTC (same as
* `System.currentTimeMillis`).
*
* It is an improved `EventEnvelope` compared to [[pekko.persistence.query.EventEnvelope]].
*
* API May Change
*/
@ApiMayChange
final class EventEnvelope[Event](
val offset: Offset,
val persistenceId: String,
val sequenceNr: Long,
val eventOption: Option[Event],
val timestamp: Long,
val eventMetadata: Option[Any],
val entityType: String,
val slice: Int) {
def event: Event =
eventOption match {
case Some(evt) => evt
case None =>
throw new IllegalStateException(
"Event was not loaded. Use eventOption and load the event on demand with LoadEventQuery.")
}
/**
* Java API
*/
def getEvent(): Event =
eventOption match {
case Some(evt) => evt
case None =>
throw new IllegalStateException(
"Event was not loaded. Use getOptionalEvent and load the event on demand with LoadEventQuery.")
}
/**
* Java API
*/
def getOptionalEvent(): Optional[Event] = {
import scala.compat.java8.OptionConverters._
eventOption.asJava
}
/**
* Java API
*/
def getEventMetaData(): Optional[AnyRef] = {
import scala.compat.java8.OptionConverters._
eventMetadata.map(_.asInstanceOf[AnyRef]).asJava
}
override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, offset)
result = HashCode.hash(result, persistenceId)
result = HashCode.hash(result, sequenceNr)
result
}
override def equals(obj: Any): Boolean = obj match {
case other: EventEnvelope[_] =>
offset == other.offset && persistenceId == other.persistenceId && sequenceNr == other.sequenceNr &&
eventOption == other.eventOption && timestamp == other.timestamp && eventMetadata == other.eventMetadata &&
entityType == other.entityType && slice == other.slice
case _ => false
}
override def toString: String =
s"EventEnvelope($offset,$persistenceId,$sequenceNr,$eventOption,$timestamp,$eventMetadata,$entityType,$slice)"
}

View file

@ -0,0 +1,39 @@
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.typed.javadsl
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.ApiMayChange
import pekko.japi.Pair
import pekko.persistence.query.Offset
import pekko.persistence.query.javadsl.ReadJournal
import pekko.persistence.query.typed.EventEnvelope
import pekko.stream.javadsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*
* API May Change
*/
@ApiMayChange
trait CurrentEventsBySliceQuery extends ReadJournal {
/**
* Same type of query as [[EventsBySliceQuery.eventsBySlices]] but the event stream is completed immediately when it
* reaches the end of the "result set". Depending on journal implementation, this may mean all events up to when the
* query is started, or it may include events that are persisted while the query is still streaming results. For
* eventually consistent stores, it may only include all events up to some point before the query is started.
*/
def currentEventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[EventEnvelope[Event], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): java.util.List[Pair[Integer, Integer]]
}

View file

@ -0,0 +1,73 @@
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.typed.javadsl
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.ApiMayChange
import pekko.japi.Pair
import pekko.persistence.query.DurableStateChange
import pekko.persistence.query.Offset
import pekko.persistence.state.javadsl.DurableStateStore
import pekko.stream.javadsl.Source
/**
* Query API for reading durable state objects.
*
* For Scala API see [[DurableStateStoreBySliceQuery]].
*
* API May Change
*/
@ApiMayChange
trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
/**
* Get a source of the most recent changes made to objects with the given slice range since the passed in offset.
*
* A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all
* persistence ids over the slices.
*
* Note that this only returns the most recent change to each object, if an object has been updated multiple times
* since the offset, only the most recent of those changes will be part of the stream.
*
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[pekko.persistence.query.UpdatedDurableState]] or
* [[pekko.persistence.query.DeletedDurableState]].
*/
def currentChangesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed]
/**
* Get a source of the most recent changes made to objects of the given slice range since the passed in offset.
*
* A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all
* persistence ids over the slices.
*
* The returned source will never terminate, it effectively watches for changes to the objects and emits changes as
* they happen.
*
* Not all changes that occur are guaranteed to be emitted, this call only guarantees that eventually, the most recent
* change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick
* succession are likely to be skipped, with only the last update resulting in a change from this source.
*
* The [[DurableStateChange]] elements can be [[pekko.persistence.query.UpdatedDurableState]] or
* [[pekko.persistence.query.DeletedDurableState]].
*/
def changesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): java.util.List[Pair[Integer, Integer]]
}

View file

@ -0,0 +1,25 @@
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.typed.javadsl
import java.time.Instant
import java.util.Optional
import java.util.concurrent.CompletionStage
import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.persistence.query.javadsl.ReadJournal
/**
* [[EventsBySliceQuery]] that is using a timestamp based offset should also implement this query.
*
* API May Change
*/
@ApiMayChange
trait EventTimestampQuery extends ReadJournal {
def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]]
}

View file

@ -0,0 +1,63 @@
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.typed.javadsl
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.ApiMayChange
import pekko.japi.Pair
import pekko.persistence.query.Offset
import pekko.persistence.query.javadsl.ReadJournal
import pekko.persistence.query.typed.EventEnvelope
import pekko.stream.javadsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*
* `EventsBySliceQuery` that is using a timestamp based offset should also implement [[EventTimestampQuery]] and
* [[LoadEventQuery]].
*
* API May Change
*/
@ApiMayChange
trait EventsBySliceQuery extends ReadJournal {
/**
* Query events for given slices. A slice is deterministically defined based on the persistence id. The purpose is to
* evenly distribute all persistence ids over the slices.
*
* The consumer can keep track of its current position in the event stream by storing the `offset` and restart the
* query from a given `offset` after a crash/restart.
*
* The exact meaning of the `offset` depends on the journal and must be documented by the read journal plugin. It may
* be a sequential id number that uniquely identifies the position of each event within the event stream. Distributed
* data stores cannot easily support those semantics and they may use a weaker meaning. For example it may be a
* timestamp (taken when the event was created or stored). Timestamps are not unique and not strictly ordered, since
* clocks on different machines may not be synchronized.
*
* In strongly consistent stores, where the `offset` is unique and strictly ordered, the stream should start from the
* next event after the `offset`. Otherwise, the read journal should ensure that between an invocation that returned
* an event with the given `offset`, and this invocation, no events are missed. Depending on the journal
* implementation, this may mean that this invocation will return events that were already returned by the previous
* invocation, including the event with the passed in `offset`.
*
* The returned event stream should be ordered by `offset` if possible, but this can also be difficult to fulfill for
* a distributed data store. The order must be documented by the read journal plugin.
*
* The stream is not completed when it reaches the end of the currently stored events, but it continues to push new
* events when new events are persisted. Corresponding query that is completed when it reaches the end of the
* currently stored events is provided by [[CurrentEventsBySliceQuery.currentEventsBySlices]].
*/
def eventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[EventEnvelope[Event], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): java.util.List[Pair[Integer, Integer]]
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.typed.javadsl
import java.util.concurrent.CompletionStage
import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.persistence.query.javadsl.ReadJournal
import pekko.persistence.query.typed.EventEnvelope
/**
* [[EventsBySliceQuery]] that is using a timestamp based offset should also implement this query.
*
* API May Change
*/
@ApiMayChange
trait LoadEventQuery extends ReadJournal {
/**
* Load a single event on demand. The `CompletionStage` is completed with a `NoSuchElementException` if
* the event for the given `persistenceId` and `sequenceNr` doesn't exist.
*/
def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]]
}

View file

@ -0,0 +1,40 @@
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.typed.scaladsl
import scala.collection.immutable
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.ApiMayChange
import pekko.persistence.query.Offset
import pekko.persistence.query.scaladsl.ReadJournal
import pekko.persistence.query.typed.EventEnvelope
import pekko.stream.scaladsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*
* API May Change
*/
@ApiMayChange
trait CurrentEventsBySliceQuery extends ReadJournal {
/**
* Same type of query as [[EventsBySliceQuery.eventsBySlices]] but the event stream is completed immediately when it
* reaches the end of the "result set". Depending on journal implementation, this may mean all events up to when the
* query is started, or it may include events that are persisted while the query is still streaming results. For
* eventually consistent stores, it may only include all events up to some point before the query is started.
*/
def currentEventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[EventEnvelope[Event], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}

View file

@ -0,0 +1,74 @@
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.typed.scaladsl
import scala.collection.immutable
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.ApiMayChange
import pekko.persistence.query.DurableStateChange
import pekko.persistence.query.Offset
import pekko.persistence.state.scaladsl.DurableStateStore
import pekko.stream.scaladsl.Source
/**
* Query API for reading durable state objects.
*
* For Java API see [[DurableStateStoreBySliceQuery]].
*
* API May Change
*/
@ApiMayChange
trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
/**
* Get a source of the most recent changes made to objects with the given slice range since the passed in offset.
*
* A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all
* persistence ids over the slices.
*
* Note that this only returns the most recent change to each object, if an object has been updated multiple times
* since the offset, only the most recent of those changes will be part of the stream.
*
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[pekko.persistence.query.UpdatedDurableState]] or
* [[pekko.persistence.query.DeletedDurableState]].
*/
def currentChangesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed]
/**
* Get a source of the most recent changes made to objects of the given slice range since the passed in offset.
*
* A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all
* persistence ids over the slices.
*
* The returned source will never terminate, it effectively watches for changes to the objects and emits changes as
* they happen.
*
* Not all changes that occur are guaranteed to be emitted, this call only guarantees that eventually, the most recent
* change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick
* succession are likely to be skipped, with only the last update resulting in a change from this source.
*
* The [[DurableStateChange]] elements can be [[pekko.persistence.query.UpdatedDurableState]] or
* [[pekko.persistence.query.DeletedDurableState]].
*/
def changesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}

View file

@ -0,0 +1,25 @@
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.typed.scaladsl
import java.time.Instant
import scala.concurrent.Future
import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.persistence.query.scaladsl.ReadJournal
/**
* [[EventsBySliceQuery]] that is using a timestamp based offset should also implement this query.
*
* API May Change
*/
@ApiMayChange
trait EventTimestampQuery extends ReadJournal {
def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]]
}

View file

@ -0,0 +1,64 @@
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.typed.scaladsl
import scala.collection.immutable
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.ApiMayChange
import pekko.persistence.query.Offset
import pekko.persistence.query.scaladsl.ReadJournal
import pekko.persistence.query.typed.EventEnvelope
import pekko.stream.scaladsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*
* `EventsBySliceQuery` that is using a timestamp based offset should also implement [[EventTimestampQuery]] and
* [[LoadEventQuery]].
*
* API May Change
*/
@ApiMayChange
trait EventsBySliceQuery extends ReadJournal {
/**
* Query events for given slices. A slice is deterministically defined based on the persistence id. The purpose is to
* evenly distribute all persistence ids over the slices.
*
* The consumer can keep track of its current position in the event stream by storing the `offset` and restart the
* query from a given `offset` after a crash/restart.
*
* The exact meaning of the `offset` depends on the journal and must be documented by the read journal plugin. It may
* be a sequential id number that uniquely identifies the position of each event within the event stream. Distributed
* data stores cannot easily support those semantics and they may use a weaker meaning. For example it may be a
* timestamp (taken when the event was created or stored). Timestamps are not unique and not strictly ordered, since
* clocks on different machines may not be synchronized.
*
* In strongly consistent stores, where the `offset` is unique and strictly ordered, the stream should start from the
* next event after the `offset`. Otherwise, the read journal should ensure that between an invocation that returned
* an event with the given `offset`, and this invocation, no events are missed. Depending on the journal
* implementation, this may mean that this invocation will return events that were already returned by the previous
* invocation, including the event with the passed in `offset`.
*
* The returned event stream should be ordered by `offset` if possible, but this can also be difficult to fulfill for
* a distributed data store. The order must be documented by the read journal plugin.
*
* The stream is not completed when it reaches the end of the currently stored events, but it continues to push new
* events when new events are persisted. Corresponding query that is completed when it reaches the end of the
* currently stored events is provided by [[CurrentEventsBySliceQuery.currentEventsBySlices]].
*/
def eventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[EventEnvelope[Event], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.typed.scaladsl
import scala.concurrent.Future
import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.persistence.query.scaladsl.ReadJournal
import pekko.persistence.query.typed.EventEnvelope
/**
* [[EventsBySliceQuery]] that is using a timestamp based offset should also implement this query.
*
* API May Change
*/
@ApiMayChange
trait LoadEventQuery extends ReadJournal {
/**
* Load a single event on demand. The `Future` is completed with a `NoSuchElementException` if
* the event for the given `persistenceId` and `sequenceNr` doesn't exist.
*/
def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): Future[EventEnvelope[Event]]
}

View file

@ -0,0 +1,36 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query;
import java.util.Iterator;
import org.apache.pekko.NotUsed;
import org.apache.pekko.persistence.query.javadsl.PersistenceIdsQuery;
import org.apache.pekko.persistence.query.javadsl.ReadJournal;
import org.apache.pekko.stream.javadsl.Source;
/** Use for tests only! Emits infinite stream of strings (representing queried for events). */
public class DummyJavaReadJournal implements ReadJournal, PersistenceIdsQuery {
public static final String Identifier = "pekko.persistence.query.journal.dummy-java";
@Override
public Source<String, NotUsed> persistenceIds() {
return Source.fromIterator(
() ->
new Iterator<String>() {
private int i = 0;
@Override
public boolean hasNext() {
return true;
}
@Override
public String next() {
return "" + (i++);
}
});
}
}

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query;
import org.apache.pekko.NotUsed;
/** Use for tests only! Emits infinite stream of strings (representing queried for events). */
public class DummyJavaReadJournalForScala
implements org.apache.pekko.persistence.query.scaladsl.ReadJournal,
org.apache.pekko.persistence.query.scaladsl.PersistenceIdsQuery {
public static final String Identifier = DummyJavaReadJournal.Identifier;
private final DummyJavaReadJournal readJournal;
public DummyJavaReadJournalForScala(DummyJavaReadJournal readJournal) {
this.readJournal = readJournal;
}
@Override
public org.apache.pekko.stream.scaladsl.Source<String, NotUsed> persistenceIds() {
return readJournal.persistenceIds().asScala();
}
}

View file

@ -0,0 +1,32 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
public class DummyJavaReadJournalProvider implements ReadJournalProvider {
public static final Config config =
ConfigFactory.parseString(
DummyJavaReadJournal.Identifier
+ " { \n"
+ " class = \""
+ DummyJavaReadJournalProvider.class.getCanonicalName()
+ "\" \n"
+ " }\n\n");
private final DummyJavaReadJournal readJournal = new DummyJavaReadJournal();
@Override
public DummyJavaReadJournalForScala scaladslReadJournal() {
return new DummyJavaReadJournalForScala(readJournal);
}
@Override
public DummyJavaReadJournal javadslReadJournal() {
return readJournal;
}
}

View file

@ -0,0 +1,28 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import org.junit.ClassRule;
public class PersistenceQueryTest {
@ClassRule
public static PekkoJUnitActorSystemResource actorSystemResource =
new PekkoJUnitActorSystemResource(PersistenceQueryTest.class.getName());
private final ActorSystem system = actorSystemResource.getSystem();
// compile-only test
@SuppressWarnings("unused")
public void shouldExposeJavaDSLFriendlyQueryJournal() throws Exception {
final DummyJavaReadJournal readJournal =
PersistenceQuery.get(system).getReadJournalFor(DummyJavaReadJournal.class, "noop-journal");
final org.apache.pekko.stream.javadsl.Source<String, NotUsed> ids =
readJournal.persistenceIds();
}
}

View file

@ -0,0 +1,80 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query
import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ExtendedActorSystem
import pekko.stream.scaladsl.Source
import pekko.util.unused
/**
* Use for tests only!
* Emits infinite stream of strings (representing queried for events).
*/
class DummyReadJournal(val dummyValue: String) extends scaladsl.ReadJournal with scaladsl.PersistenceIdsQuery {
override def persistenceIds(): Source[String, NotUsed] =
Source.fromIterator(() => Iterator.from(0)).map(_.toString)
}
object DummyReadJournal {
final val Identifier = "pekko.persistence.query.journal.dummy"
}
class DummyReadJournalForJava(readJournal: DummyReadJournal)
extends javadsl.ReadJournal
with javadsl.PersistenceIdsQuery {
override def persistenceIds(): pekko.stream.javadsl.Source[String, NotUsed] =
readJournal.persistenceIds().asJava
}
object DummyReadJournalProvider {
final val config: Config = ConfigFactory.parseString(s"""
${DummyReadJournal.Identifier} {
class = "${classOf[DummyReadJournalProvider].getCanonicalName}"
}
${DummyReadJournal.Identifier}2 {
class = "${classOf[DummyReadJournalProvider2].getCanonicalName}"
}
${DummyReadJournal.Identifier}3 {
class = "${classOf[DummyReadJournalProvider3].getCanonicalName}"
}
${DummyReadJournal.Identifier}4 {
class = "${classOf[DummyReadJournalProvider4].getCanonicalName}"
}
${DummyReadJournal.Identifier}5 {
class = "${classOf[DummyReadJournalProvider5].getCanonicalName}"
}
""")
}
class DummyReadJournalProvider(dummyValue: String) extends ReadJournalProvider {
// mandatory zero-arg constructor
def this() = this("dummy")
val readJournal = new DummyReadJournal(dummyValue)
override def scaladslReadJournal(): DummyReadJournal =
readJournal
val javaReadJournal = new DummyReadJournalForJava(readJournal)
override def javadslReadJournal(): DummyReadJournalForJava =
javaReadJournal
}
class DummyReadJournalProvider2(@unused sys: ExtendedActorSystem) extends DummyReadJournalProvider
class DummyReadJournalProvider3(@unused sys: ExtendedActorSystem, @unused conf: Config) extends DummyReadJournalProvider
class DummyReadJournalProvider4(@unused sys: ExtendedActorSystem, @unused conf: Config, @unused confPath: String)
extends DummyReadJournalProvider
class DummyReadJournalProvider5(@unused sys: ExtendedActorSystem) extends DummyReadJournalProvider
class CustomDummyReadJournalProvider5(@unused sys: ExtendedActorSystem) extends DummyReadJournalProvider("custom")

View file

@ -0,0 +1,38 @@
/*
* Copyright (C) 2019-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query
import java.util.UUID
import scala.util.Random
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
class OffsetSpec extends AnyWordSpecLike with Matchers {
"TimeBasedUUID offset" must {
"be ordered correctly" in {
val uuid1 = TimeBasedUUID(UUID.fromString("49225740-2019-11ea-a752-ffae2393b6e4")) // 2019-12-16T15:32:36.148Z[UTC]
val uuid2 = TimeBasedUUID(UUID.fromString("91be23d0-2019-11ea-a752-ffae2393b6e4")) // 2019-12-16T15:34:37.965Z[UTC]
val uuid3 = TimeBasedUUID(UUID.fromString("91f95810-2019-11ea-a752-ffae2393b6e4")) // 2019-12-16T15:34:38.353Z[UTC]
uuid1.value.timestamp() should be < uuid2.value.timestamp()
uuid2.value.timestamp() should be < uuid3.value.timestamp()
List(uuid2, uuid1, uuid3).sorted shouldEqual List(uuid1, uuid2, uuid3)
List(uuid3, uuid2, uuid1).sorted shouldEqual List(uuid1, uuid2, uuid3)
}
}
"Sequence offset" must {
"be ordered correctly" in {
val sequenceBasedList = List(1L, 2L, 3L).map(Sequence(_))
Random.shuffle(sequenceBasedList).sorted shouldEqual sequenceBasedList
}
}
}

View file

@ -0,0 +1,111 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Await
import scala.concurrent.duration._
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.persistence.journal.{ EventSeq, ReadEventAdapter }
class PersistenceQuerySpec extends AnyWordSpecLike with Matchers with BeforeAndAfterAll {
val eventAdaptersConfig =
s"""
|pekko.persistence.query.journal.dummy {
| event-adapters {
| adapt = ${classOf[PrefixStringWithPAdapter].getCanonicalName}
| }
|}
""".stripMargin
val customReadJournalPluginConfig =
s"""
|${DummyReadJournal.Identifier}5 {
| class = "${classOf[CustomDummyReadJournalProvider5].getCanonicalName}"
|}
|${DummyReadJournal.Identifier}6 {
| class = "${classOf[DummyReadJournalProvider].getCanonicalName}"
|}
""".stripMargin
"ReadJournal" must {
"be found by full config key" in {
withActorSystem() { system =>
val readJournalPluginConfig: Config = ConfigFactory.parseString(customReadJournalPluginConfig)
PersistenceQuery
.get(system)
.readJournalFor[DummyReadJournal](DummyReadJournal.Identifier, readJournalPluginConfig)
// other combinations of constructor parameters
PersistenceQuery
.get(system)
.readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "2", readJournalPluginConfig)
PersistenceQuery
.get(system)
.readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "3", readJournalPluginConfig)
PersistenceQuery
.get(system)
.readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "4", readJournalPluginConfig)
// config key existing within both the provided readJournalPluginConfig
// and the actorSystem config. The journal must be created from the provided config then.
val dummyReadJournal5 = PersistenceQuery
.get(system)
.readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "5", readJournalPluginConfig)
dummyReadJournal5.dummyValue should equal("custom")
// config key directly coming from the provided readJournalPluginConfig,
// and does not exist within the actorSystem config
PersistenceQuery
.get(system)
.readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "6", readJournalPluginConfig)
}
}
"throw if unable to find query journal by config key" in {
withActorSystem() { system =>
intercept[IllegalArgumentException] {
PersistenceQuery.get(system).readJournalFor[DummyReadJournal](DummyReadJournal.Identifier + "-unknown")
}.getMessage should include("missing persistence plugin")
}
}
}
private val systemCounter = new AtomicInteger()
private def withActorSystem(conf: String = "")(block: ActorSystem => Unit): Unit = {
val config =
DummyReadJournalProvider.config
.withFallback(DummyJavaReadJournalProvider.config)
.withFallback(ConfigFactory.parseString(conf))
.withFallback(ConfigFactory.parseString(eventAdaptersConfig))
.withFallback(ConfigFactory.load())
val sys = ActorSystem(s"sys-${systemCounter.incrementAndGet()}", config)
try block(sys)
finally Await.ready(sys.terminate(), 10.seconds)
}
}
object ExampleQueryModels {
case class OldModel(value: String) {
def promote = NewModel(value)
}
case class NewModel(value: String)
}
class PrefixStringWithPAdapter extends ReadEventAdapter {
override def fromJournal(event: Any, manifest: String) = EventSeq.single("p-" + event)
}

View file

@ -0,0 +1,94 @@
/*
* Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.internal
import java.time.Instant
import java.util.UUID
import org.apache.pekko
import pekko.persistence.query.NoOffset
import pekko.persistence.query.Sequence
import pekko.persistence.query.TimeBasedUUID
import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.typed.EventEnvelope
import pekko.serialization.SerializationExtension
import pekko.serialization.SerializerWithStringManifest
import pekko.testkit.PekkoSpec
class QuerySerializerSpec extends PekkoSpec {
private val serialization = SerializationExtension(system)
def verifySerialization(obj: AnyRef): Unit = {
val serializer = serialization.findSerializerFor(obj).asInstanceOf[SerializerWithStringManifest]
val manifest = serializer.manifest(obj)
val bytes = serialization.serialize(obj).get
val deserialzied = serialization.deserialize(bytes, serializer.identifier, manifest).get
deserialzied shouldBe obj
}
"Query serializer" should {
"serialize EventEnvelope with Sequence Offset" in {
verifySerialization(
EventEnvelope(Sequence(1L), "TestEntity|id1", 3L, "event1", System.currentTimeMillis(), "TestEntity", 5))
}
"serialize EventEnvelope with Meta" in {
verifySerialization(
new EventEnvelope(
Sequence(1L),
"TestEntity|id1",
3L,
Some("event1"),
System.currentTimeMillis(),
Some("some-meta"),
"TestEntity",
5))
}
"serialize EventEnvelope with Timestamp Offset" in {
verifySerialization(
EventEnvelope(
TimestampOffset(Instant.now(), Instant.now(), Map("pid1" -> 3)),
"TestEntity|id1",
3L,
"event1",
System.currentTimeMillis(),
"TestEntity",
5))
}
"serialize EventEnvelope with TimeBasedUUID Offset" in {
// 2019-12-16T15:32:36.148Z[UTC]
val uuidString = "49225740-2019-11ea-a752-ffae2393b6e4"
val timeUuidOffset = TimeBasedUUID(UUID.fromString(uuidString))
verifySerialization(
EventEnvelope(timeUuidOffset, "TestEntity|id1", 3L, "event1", System.currentTimeMillis(), "TestEntity", 5))
}
"serialize Sequence Offset" in {
verifySerialization(Sequence(0))
}
"serialize Timestamp Offset" in {
verifySerialization(TimestampOffset(Instant.now(), Instant.now(), Map("pid1" -> 3)))
verifySerialization(TimestampOffset(Instant.now(), Instant.now(), Map("pid1" -> 3, "pid2" -> 4)))
verifySerialization(TimestampOffset(Instant.now(), Instant.now(), Map.empty))
verifySerialization(TimestampOffset(Instant.now(), Map.empty))
}
"serialize TimeBasedUUID Offset" in {
// 2019-12-16T15:32:36.148Z[UTC]
val uuidString = "49225740-2019-11ea-a752-ffae2393b6e4"
val timeUuidOffset = TimeBasedUUID(UUID.fromString(uuidString))
verifySerialization(timeUuidOffset)
}
"serialize NoOffset" in {
verifySerialization(NoOffset)
}
}
}

View file

@ -0,0 +1,81 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.journal.leveldb
import scala.concurrent.duration._
import org.apache.pekko
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import pekko.persistence.query.scaladsl.PersistenceIdsQuery
import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.PekkoSpec
import pekko.testkit.ImplicitSender
import scala.annotation.nowarn
object AllPersistenceIdsSpec {
val config = """
pekko.loglevel = INFO
pekko.persistence.journal.plugin = "pekko.persistence.journal.leveldb"
pekko.persistence.journal.leveldb.dir = "target/journal-AllPersistenceIdsSpec"
pekko.test.single-expect-default = 10s
# test is using Java serialization and not priority to rewrite
pekko.actor.allow-java-serialization = on
pekko.actor.warn-about-java-serializer-usage = off
"""
}
class AllPersistenceIdsSpec extends PekkoSpec(AllPersistenceIdsSpec.config) with Cleanup with ImplicitSender {
@nowarn("msg=deprecated")
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
"Leveldb query AllPersistenceIds" must {
"implement standard AllPersistenceIdsQuery" in {
queries.isInstanceOf[PersistenceIdsQuery] should ===(true)
}
"find existing persistenceIds" in {
system.actorOf(TestActor.props("a")) ! "a1"
expectMsg("a1-done")
system.actorOf(TestActor.props("b")) ! "b1"
expectMsg("b1-done")
system.actorOf(TestActor.props("c")) ! "c1"
expectMsg("c1-done")
val src = queries.currentPersistenceIds()
val probe = src.runWith(TestSink.probe[String])
probe.within(10.seconds) {
probe.request(5).expectNextUnordered("a", "b", "c").expectComplete()
}
}
"find new persistenceIds" in {
// a, b, c created by previous step
system.actorOf(TestActor.props("d")) ! "d1"
expectMsg("d1-done")
val src = queries.persistenceIds()
val probe = src.runWith(TestSink.probe[String])
probe.within(10.seconds) {
probe.request(5).expectNextUnorderedN(List("a", "b", "c", "d"))
system.actorOf(TestActor.props("e")) ! "e1"
probe.expectNext("e")
val more = (1 to 100).map("f" + _)
more.foreach { p =>
system.actorOf(TestActor.props(p)) ! p
}
probe.request(100)
probe.expectNextUnorderedN(more)
}
}
}
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.journal.leveldb
import java.io.File
import org.apache.commons.io.FileUtils
import org.apache.pekko.testkit.PekkoSpec
trait Cleanup { this: PekkoSpec =>
val storageLocations =
List(
"pekko.persistence.journal.leveldb.dir",
"pekko.persistence.journal.leveldb-shared.store.dir",
"pekko.persistence.snapshot-store.local.dir").map(s => new File(system.settings.config.getString(s)))
override protected def atStartup(): Unit = {
storageLocations.foreach(FileUtils.deleteDirectory)
}
override protected def afterTermination(): Unit = {
storageLocations.foreach(FileUtils.deleteDirectory)
}
}

View file

@ -0,0 +1,226 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.journal.leveldb
import scala.concurrent.duration._
import org.apache.pekko
import pekko.actor.ActorRef
import pekko.persistence.query.EventEnvelope
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import pekko.persistence.query.scaladsl.EventsByTagQuery
import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.PekkoSpec
import pekko.testkit.ImplicitSender
import scala.annotation.nowarn
object EventsByPersistenceIdSpec {
val config = """
pekko.loglevel = INFO
pekko.persistence.journal.plugin = "pekko.persistence.journal.leveldb"
pekko.persistence.journal.leveldb.dir = "target/journal-EventsByPersistenceIdSpec"
pekko.test.single-expect-default = 10s
pekko.persistence.query.journal.leveldb.refresh-interval = 1s
# test is using Java serialization and not priority to rewrite
pekko.actor.allow-java-serialization = on
pekko.actor.warn-about-java-serializer-usage = off
"""
}
class EventsByPersistenceIdSpec extends PekkoSpec(EventsByPersistenceIdSpec.config) with Cleanup with ImplicitSender {
@nowarn("msg=deprecated")
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
def setup(persistenceId: String): ActorRef = {
val ref = setupEmpty(persistenceId)
ref ! s"$persistenceId-1"
ref ! s"$persistenceId-2"
ref ! s"$persistenceId-3"
expectMsg(s"$persistenceId-1-done")
expectMsg(s"$persistenceId-2-done")
expectMsg(s"$persistenceId-3-done")
ref
}
def setupEmpty(persistenceId: String): ActorRef = {
system.actorOf(TestActor.props(persistenceId))
}
"Leveldb query EventsByPersistenceId" must {
"implement standard EventsByTagQuery" in {
queries.isInstanceOf[EventsByTagQuery] should ===(true)
}
"find existing events" in {
setup("a")
val src = queries.currentEventsByPersistenceId("a", 0L, Long.MaxValue)
src
.map(_.event)
.runWith(TestSink.probe[Any])
.request(2)
.expectNext("a-1", "a-2")
.expectNoMessage(500.millis)
.request(2)
.expectNext("a-3")
.expectComplete()
}
"find existing events up to a sequence number" in {
setup("b")
val src = queries.currentEventsByPersistenceId("b", 0L, 2L)
src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectNext("b-1", "b-2").expectComplete()
}
"not see new events after demand request" in {
val ref = setup("f")
val src = queries.currentEventsByPersistenceId("f", 0L, Long.MaxValue)
val probe =
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("f-1", "f-2").expectNoMessage(100.millis)
ref ! "f-4"
expectMsg("f-4-done")
probe.expectNoMessage(100.millis).request(5).expectNext("f-3").expectComplete() // f-4 not seen
}
"return empty stream for cleaned journal from 0 to MaxLong" in {
val ref = setup("g1")
ref ! TestActor.DeleteCmd(3L)
expectMsg(s"${3L}-deleted")
val src = queries.currentEventsByPersistenceId("g1", 0L, Long.MaxValue)
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete()
}
"return empty stream for cleaned journal from 0 to 0" in {
val ref = setup("g2")
ref ! TestActor.DeleteCmd(3L)
expectMsg(s"${3L}-deleted")
val src = queries.currentEventsByPersistenceId("g2", 0L, 0L)
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete()
}
"return remaining values after partial journal cleanup" in {
val ref = setup("h")
ref ! TestActor.DeleteCmd(2L)
expectMsg(s"${2L}-deleted")
val src = queries.currentEventsByPersistenceId("h", 0L, Long.MaxValue)
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectNext("h-3").expectComplete()
}
"return empty stream for empty journal" in {
setupEmpty("i")
val src = queries.currentEventsByPersistenceId("i", 0L, Long.MaxValue)
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete()
}
"return empty stream for journal from 0 to 0" in {
setup("k1")
val src = queries.currentEventsByPersistenceId("k1", 0L, 0L)
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete()
}
"return empty stream for empty journal from 0 to 0" in {
setupEmpty("k2")
val src = queries.currentEventsByPersistenceId("k2", 0L, 0L)
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete()
}
"return empty stream for journal from seqNo greater than highestSeqNo" in {
setup("l")
val src = queries.currentEventsByPersistenceId("l", 4L, 3L)
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete()
}
"include timestamp in EventEnvelope" in {
setup("m")
val src = queries.currentEventsByPersistenceId("m", 0L, Long.MaxValue)
val probe = src.runWith(TestSink.probe[EventEnvelope])
probe.request(5)
probe.expectNext().timestamp should be > 0L
probe.expectNext().timestamp should be > 0L
probe.expectNext().timestamp should be > 0L
probe.expectComplete()
}
}
"Leveldb live query EventsByPersistenceId" must {
"find new events" in {
val ref = setup("c")
val src = queries.eventsByPersistenceId("c", 0L, Long.MaxValue)
val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectNext("c-1", "c-2", "c-3")
ref ! "c-4"
expectMsg("c-4-done")
probe.expectNext("c-4")
}
"find new events up to a sequence number" in {
val ref = setup("d")
val src = queries.eventsByPersistenceId("d", 0L, 4L)
val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectNext("d-1", "d-2", "d-3")
ref ! "d-4"
expectMsg("d-4-done")
probe.expectNext("d-4").expectComplete()
}
"find new events after demand request" in {
val ref = setup("e")
val src = queries.eventsByPersistenceId("e", 0L, Long.MaxValue)
val probe =
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("e-1", "e-2").expectNoMessage(100.millis)
ref ! "e-4"
expectMsg("e-4-done")
probe.expectNoMessage(100.millis).request(5).expectNext("e-3").expectNext("e-4")
}
"include timestamp in EventEnvelope" in {
setup("n")
val src = queries.eventsByPersistenceId("n", 0L, Long.MaxValue)
val probe = src.runWith(TestSink.probe[EventEnvelope])
probe.request(5)
probe.expectNext().timestamp should be > 0L
probe.expectNext().timestamp should be > 0L
probe.cancel()
}
"not complete for empty persistence id" in {
val src = queries.eventsByPersistenceId("o", 0L, Long.MaxValue)
val probe =
src.map(_.event).runWith(TestSink.probe[Any]).request(2)
probe.expectNoMessage(200.millis) // must not complete
val ref = setupEmpty("o")
ref ! "o-1"
expectMsg(s"o-1-done")
probe.cancel()
}
}
}

View file

@ -0,0 +1,257 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.journal.leveldb
import scala.concurrent.duration._
import org.apache.pekko
import pekko.persistence.journal.Tagged
import pekko.persistence.journal.WriteEventAdapter
import pekko.persistence.query.EventEnvelope
import pekko.persistence.query.NoOffset
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.Sequence
import pekko.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import pekko.persistence.query.scaladsl.EventsByTagQuery
import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.PekkoSpec
import pekko.testkit.ImplicitSender
import scala.annotation.nowarn
object EventsByTagSpec {
val config = s"""
pekko.loglevel = INFO
pekko.persistence.journal.plugin = "pekko.persistence.journal.leveldb"
pekko.persistence.journal.leveldb {
dir = "target/journal-EventsByTagSpec"
event-adapters {
color-tagger = org.apache.pekko.persistence.query.journal.leveldb.ColorTagger
}
event-adapter-bindings = {
"java.lang.String" = color-tagger
}
}
pekko.persistence.query.journal.leveldb {
refresh-interval = 1s
max-buffer-size = 2
}
pekko.test.single-expect-default = 10s
leveldb-no-refresh = $${pekko.persistence.query.journal.leveldb}
leveldb-no-refresh {
refresh-interval = 10m
}
"""
}
class ColorTagger extends WriteEventAdapter {
val colors = Set("green", "black", "blue", "pink", "yellow")
override def toJournal(event: Any): Any = event match {
case s: String =>
val tags = colors.foldLeft(Set.empty[String])((acc, c) => if (s.contains(c)) acc + c else acc)
if (tags.isEmpty) event
else Tagged(event, tags)
case _ => event
}
override def manifest(event: Any): String = ""
}
class EventsByTagSpec extends PekkoSpec(EventsByTagSpec.config) with Cleanup with ImplicitSender {
@nowarn("msg=deprecated")
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
"Leveldb query EventsByTag" must {
"implement standard EventsByTagQuery" in {
queries.isInstanceOf[EventsByTagQuery] should ===(true)
}
"find existing events" in {
val a = system.actorOf(TestActor.props("a"))
val b = system.actorOf(TestActor.props("b"))
a ! "hello"
expectMsg(s"hello-done")
a ! "a green apple"
expectMsg(s"a green apple-done")
b ! "a black car"
expectMsg(s"a black car-done")
a ! "a green banana"
expectMsg(s"a green banana-done")
b ! "a green leaf"
expectMsg(s"a green leaf-done")
val greenSrc = queries.currentEventsByTag(tag = "green", offset = NoOffset)
greenSrc
.runWith(TestSink.probe[Any])
.request(2)
.expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple", 0L))
.expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana", 0L))
.expectNoMessage(500.millis)
.request(2)
.expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf", 0L))
.expectComplete()
val blackSrc = queries.currentEventsByTag(tag = "black", offset = Sequence(0L))
blackSrc
.runWith(TestSink.probe[Any])
.request(5)
.expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car", 0L))
.expectComplete()
}
"not see new events after demand request" in {
val c = system.actorOf(TestActor.props("c"))
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L))
val probe = greenSrc
.runWith(TestSink.probe[Any])
.request(2)
.expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple", 0L))
.expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana", 0L))
.expectNoMessage(100.millis)
c ! "a green cucumber"
expectMsg(s"a green cucumber-done")
probe
.expectNoMessage(100.millis)
.request(5)
.expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf", 0L))
.expectComplete() // green cucumber not seen
}
"find events from offset (exclusive)" in {
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(2L))
greenSrc
.runWith(TestSink.probe[Any])
.request(10)
// note that banana is not included, since exclusive offset
.expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf", 0L))
.expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber", 0L))
.expectComplete()
}
"buffer elements until demand" in {
val a = system.actorOf(TestActor.props("z"))
a ! "a pink apple"
expectMsg(s"a pink apple-done")
a ! "a pink banana"
expectMsg(s"a pink banana-done")
a ! "a pink orange"
expectMsg(s"a pink orange-done")
val pinkSrc = queries.currentEventsByTag(tag = "pink")
val probe = pinkSrc.runWith(TestSink.probe[Any])
probe.request(1).expectNext(EventEnvelope(Sequence(1L), "z", 1L, "a pink apple", 0L))
system.log.info("delay before demand")
probe
.expectNoMessage(200.millis)
.request(3)
.expectNext(EventEnvelope(Sequence(2L), "z", 2L, "a pink banana", 0L))
.expectNext(EventEnvelope(Sequence(3L), "z", 3L, "a pink orange", 0L))
.expectComplete()
}
"include timestamp in EventEnvelope" in {
system.actorOf(TestActor.props("testTimestamp"))
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L))
val probe = greenSrc.runWith(TestSink.probe[EventEnvelope])
probe.request(2)
probe.expectNext().timestamp should be > 0L
probe.expectNext().timestamp should be > 0L
probe.cancel()
}
}
"Leveldb live query EventsByTag" must {
"find new events" in {
val d = system.actorOf(TestActor.props("d"))
val blackSrc = queries.eventsByTag(tag = "black", offset = NoOffset)
val probe = blackSrc.runWith(TestSink.probe[Any])
try {
probe.request(2).expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car", 0L)).expectNoMessage(100.millis)
d ! "a black dog"
expectMsg(s"a black dog-done")
d ! "a black night"
expectMsg(s"a black night-done")
probe
.expectNext(EventEnvelope(Sequence(2L), "d", 1L, "a black dog", 0L))
.expectNoMessage(100.millis)
.request(10)
.expectNext(EventEnvelope(Sequence(3L), "d", 2L, "a black night", 0L))
} finally {
probe.cancel()
}
}
"find events from offset (exclusive)" in {
val greenSrc = queries.eventsByTag(tag = "green", offset = Sequence(2L))
val probe = greenSrc.runWith(TestSink.probe[Any])
try {
probe
.request(10)
// note that banana is not included, since exclusive offset
.expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf", 0L))
.expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber", 0L))
.expectNoMessage(100.millis)
} finally {
probe.cancel()
}
}
"finds events without refresh" in {
@nowarn("msg=deprecated")
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal]("leveldb-no-refresh")
val d = system.actorOf(TestActor.props("y"))
d ! "a yellow car"
expectMsg("a yellow car-done")
val yellowSrc = queries.eventsByTag(tag = "yellow", offset = NoOffset)
val probe = yellowSrc
.runWith(TestSink.probe[Any])
.request(2)
.expectNext(EventEnvelope(Sequence(1L), "y", 1L, "a yellow car", 0L))
.expectNoMessage(100.millis)
d ! "a yellow dog"
expectMsg(s"a yellow dog-done")
d ! "a yellow night"
expectMsg(s"a yellow night-done")
probe
.expectNext(EventEnvelope(Sequence(2L), "y", 2L, "a yellow dog", 0L))
.expectNoMessage(100.millis)
.request(10)
.expectNext(EventEnvelope(Sequence(3L), "y", 3L, "a yellow night", 0L))
probe.cancel()
}
"not complete for empty stream" in {
val src = queries.eventsByTag(tag = "red", offset = NoOffset)
val probe =
src.map(_.event).runWith(TestSink.probe[Any]).request(2)
probe.expectNoMessage(200.millis)
probe.cancel()
}
}
}

View file

@ -0,0 +1,37 @@
/*
* Copyright (C) 2015-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.query.journal.leveldb
import org.apache.pekko
import pekko.actor.Props
import pekko.persistence.PersistentActor
object TestActor {
def props(persistenceId: String): Props =
Props(new TestActor(persistenceId))
case class DeleteCmd(toSeqNr: Long = Long.MaxValue)
}
class TestActor(override val persistenceId: String) extends PersistentActor {
import TestActor.DeleteCmd
val receiveRecover: Receive = {
case _: String =>
}
val receiveCommand: Receive = {
case DeleteCmd(toSeqNr) =>
deleteMessages(toSeqNr)
sender() ! s"$toSeqNr-deleted"
case cmd: String =>
persist(cmd) { evt =>
sender() ! s"$evt-done"
}
}
}