Merge pull request #23563 from nick-nachos/wip-21677-leveldbJournalCompaction
Added journal compaction behavior for LevelDB #21677
This commit is contained in:
commit
f85a124a17
10 changed files with 459 additions and 4 deletions
|
|
@ -1271,6 +1271,13 @@ directory. This location can be changed by configuration where the specified pat
|
|||
|
||||
With this plugin, each actor system runs its own private LevelDB instance.
|
||||
|
||||
One peculiarity of LevelDB is that the deletion operation does not remove messages from the journal, but adds
|
||||
a "tombstone" for each deleted message instead. In the case of heavy journal usage, especially one including frequent
|
||||
deletes, this may be an issue as users may find themselves dealing with continuously increasing journal sizes. To
|
||||
this end, LevelDB offers a special journal compaction function that is exposed via the following configuration:
|
||||
|
||||
@@snip [PersistencePluginDocSpec.scala]($code$/scala/docs/persistence/PersistencePluginDocSpec.scala) { #compaction-intervals-config }
|
||||
|
||||
<a id="shared-leveldb-journal"></a>
|
||||
### Shared LevelDB journal
|
||||
|
||||
|
|
|
|||
|
|
@ -47,6 +47,18 @@ object PersistencePluginDocSpec {
|
|||
//#native-config
|
||||
akka.persistence.journal.leveldb.native = off
|
||||
//#native-config
|
||||
|
||||
//#compaction-intervals-config
|
||||
# Number of deleted messages per persistence id that will trigger journal compaction
|
||||
akka.persistence.journal.leveldb.compaction-intervals {
|
||||
persistence-id-1 = 100
|
||||
persistence-id-2 = 200
|
||||
# ...
|
||||
persistence-id-N = 1000
|
||||
# use wildcards to match unspecified persistence ids, if any
|
||||
"*" = 250
|
||||
}
|
||||
//#compaction-intervals-config
|
||||
"""
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,14 @@
|
|||
# LevelDB journal compaction #21677
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.journal.leveldb.LeveldbStore.compactionIntervals")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.journal.leveldb.LeveldbStore.akka$persistence$journal$leveldb$LeveldbStore$_setter_$compactionIntervals_=")
|
||||
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.compactionLimit")
|
||||
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.compactionSegment")
|
||||
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.akka$persistence$journal$leveldb$CompactionSegmentManagement$$latestCompactionSegments")
|
||||
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.compactionIntervals")
|
||||
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.updateCompactionSegment")
|
||||
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.akka$persistence$journal$leveldb$CompactionSegmentManagement$$latestCompactionSegments_=")
|
||||
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.mustCompact")
|
||||
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.actor.ActorLogging.akka$actor$ActorLogging$$_log_=")
|
||||
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.actor.ActorLogging.log")
|
||||
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.actor.ActorLogging.akka$actor$ActorLogging$$_log")
|
||||
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.LeveldbCompaction.receiveCompactionInternal")
|
||||
|
|
@ -244,6 +244,9 @@ akka.persistence.journal.leveldb {
|
|||
checksum = off
|
||||
# Native LevelDB (via JNI) or LevelDB Java port.
|
||||
native = on
|
||||
# Number of deleted messages per persistence id that will trigger journal compaction
|
||||
compaction-intervals {
|
||||
}
|
||||
}
|
||||
|
||||
# Shared LevelDB journal plugin (for testing only).
|
||||
|
|
@ -268,6 +271,9 @@ akka.persistence.journal.leveldb-shared {
|
|||
checksum = off
|
||||
# Native LevelDB (via JNI) or LevelDB Java port.
|
||||
native = on
|
||||
# Number of deleted messages per persistence id that will trigger journal compaction
|
||||
compaction-intervals {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.journal.leveldb
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging }
|
||||
|
||||
private[persistence] object LeveldbCompaction {
|
||||
|
||||
case class TryCompactLeveldb(persistenceId: String, toSeqNr: Long)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Exposure of LevelDB compaction capability to reduce journal size upon message deletions.
|
||||
*/
|
||||
private[persistence] trait LeveldbCompaction extends Actor with ActorLogging with CompactionSegmentManagement {
|
||||
this: LeveldbStore ⇒
|
||||
|
||||
import Key._
|
||||
import LeveldbCompaction._
|
||||
|
||||
def receiveCompactionInternal: Receive = {
|
||||
case TryCompactLeveldb(persistenceId, toSeqNr) ⇒
|
||||
tryCompactOnDelete(persistenceId, toSeqNr)
|
||||
}
|
||||
|
||||
private def tryCompactOnDelete(persistenceId: String, toSeqNr: Long): Unit = {
|
||||
if (mustCompact(persistenceId, toSeqNr)) {
|
||||
val limit = compactionLimit(persistenceId, toSeqNr)
|
||||
log.info("Starting compaction for persistence id [{}] up to sequence number [{}]", persistenceId, limit)
|
||||
val start = keyToBytes(Key(numericId(persistenceId), 0, 0))
|
||||
val end = keyToBytes(Key(numericId(persistenceId), limit, 0))
|
||||
leveldb.compactRange(start, end)
|
||||
updateCompactionSegment(persistenceId, compactionSegment(persistenceId, limit))
|
||||
log.info("Compaction for persistence id [{}] up to sequence number [{}] is complete", persistenceId, limit)
|
||||
} else {
|
||||
log.debug("No compaction required yet for persistence id [{}] up to sequence number [{}]", persistenceId, toSeqNr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Calculates and stores info of compacted "segments" per persistence id.
|
||||
*
|
||||
* Assuming a compaction interval N for a given persistence id, then compaction is to be performed
|
||||
* for segments of message sequence numbers according to the following pattern:
|
||||
*
|
||||
* [0, N), [N, 2*N), ... , [m*N, (m + 1)*N)
|
||||
*
|
||||
* Once deletion is performed up to a sequence number 'toSeqNr', then 'toSeqNr' will be used to determine the
|
||||
* rightmost segment up to which compaction will be performed. Eligible segments for compaction are only
|
||||
* considered to be those which include sequence numbers up to 'toSeqNr' AND whose size is equal to N (the compaction
|
||||
* interval). This rule implies that if 'toSeqNr' spans an incomplete portion of a rightmost segment, then
|
||||
* that segment will be omitted from the pending compaction, and will be included into the next one.
|
||||
*
|
||||
*/
|
||||
private[persistence] trait CompactionSegmentManagement {
|
||||
|
||||
import CompactionSegmentManagement._
|
||||
|
||||
private[this] var latestCompactionSegments = Map.empty[String, Long]
|
||||
|
||||
def compactionIntervals: Map[String, Long]
|
||||
|
||||
def updateCompactionSegment(persistenceId: String, compactionSegment: Long): Unit = {
|
||||
latestCompactionSegments += persistenceId → compactionSegment
|
||||
}
|
||||
|
||||
def compactionLimit(persistenceId: String, toSeqNr: Long): Long = {
|
||||
val interval = compactionInterval(persistenceId)
|
||||
(toSeqNr + 1) / interval * interval - 1
|
||||
}
|
||||
|
||||
def compactionSegment(persistenceId: String, toSeqNr: Long): Long = (toSeqNr + 1) / compactionInterval(persistenceId)
|
||||
|
||||
def mustCompact(persistenceId: String, toSeqNr: Long): Boolean =
|
||||
isCompactionEnabled(persistenceId) && isCompactionRequired(persistenceId, toSeqNr)
|
||||
|
||||
private def isCompactionEnabled(persistenceId: String): Boolean = compactionInterval(persistenceId) > 0L
|
||||
|
||||
private def isCompactionRequired(persistenceId: String, toSeqNr: Long): Boolean =
|
||||
compactionSegment(persistenceId, toSeqNr) > latestCompactionSegment(persistenceId)
|
||||
|
||||
private def latestCompactionSegment(persistenceId: String): Long = latestCompactionSegments.getOrElse(persistenceId, 0L)
|
||||
|
||||
private def compactionInterval(persistenceId: String): Long =
|
||||
compactionIntervals.getOrElse(persistenceId, compactionIntervals.getOrElse(Wildcard, 0L))
|
||||
}
|
||||
|
||||
private[persistence] object CompactionSegmentManagement {
|
||||
val Wildcard = "*"
|
||||
}
|
||||
|
|
@ -29,7 +29,7 @@ private[persistence] class LeveldbJournal(cfg: Config) extends AsyncWriteJournal
|
|||
if (cfg ne LeveldbStore.emptyConfig) cfg
|
||||
else context.system.settings.config.getConfig("akka.persistence.journal.leveldb")
|
||||
|
||||
override def receivePluginInternal: Receive = {
|
||||
override def receivePluginInternal: Receive = receiveCompactionInternal orElse {
|
||||
case r @ ReplayTaggedMessages(fromSequenceNr, toSequenceNr, max, tag, replyTo) ⇒
|
||||
import context.dispatcher
|
||||
val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1)
|
||||
|
|
|
|||
|
|
@ -15,20 +15,25 @@ import akka.serialization.SerializationExtension
|
|||
import org.iq80.leveldb._
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.util._
|
||||
import scala.concurrent.Future
|
||||
import scala.util.control.NonFatal
|
||||
import akka.persistence.journal.Tagged
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
import com.typesafe.config.{ Config, ConfigFactory, ConfigObject }
|
||||
|
||||
private[persistence] object LeveldbStore {
|
||||
val emptyConfig = ConfigFactory.empty()
|
||||
|
||||
def toCompactionIntervalMap(obj: ConfigObject): Map[String, Long] = {
|
||||
obj.unwrapped().asScala.map(entry ⇒ (entry._1, java.lang.Long.parseLong(entry._2.toString))).toMap
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with LeveldbIdMapping with LeveldbRecovery {
|
||||
private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with LeveldbIdMapping with LeveldbRecovery with LeveldbCompaction {
|
||||
|
||||
def prepareConfig: Config
|
||||
|
||||
|
|
@ -40,6 +45,7 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with
|
|||
val leveldbWriteOptions = new WriteOptions().sync(config.getBoolean("fsync")).snapshot(false)
|
||||
val leveldbDir = new File(config.getString("dir"))
|
||||
var leveldb: DB = _
|
||||
override val compactionIntervals: Map[String, Long] = LeveldbStore.toCompactionIntervalMap(config.getObject("compaction-intervals"))
|
||||
|
||||
private val persistenceIdSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef]
|
||||
private val tagSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef]
|
||||
|
|
@ -112,6 +118,8 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with
|
|||
batch.delete(keyToBytes(Key(nid, sequenceNr, 0)))
|
||||
sequenceNr += 1
|
||||
}
|
||||
|
||||
self ! LeveldbCompaction.TryCompactLeveldb(persistenceId, toSeqNr)
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ class SharedLeveldbStore(cfg: Config) extends LeveldbStore {
|
|||
if (cfg ne LeveldbStore.emptyConfig) cfg.getConfig("store")
|
||||
else context.system.settings.config.getConfig("akka.persistence.journal.leveldb-shared.store")
|
||||
|
||||
def receive = {
|
||||
def receive = receiveCompactionInternal orElse {
|
||||
case WriteMessages(messages) ⇒
|
||||
// TODO it would be nice to DRY this with AsyncWriteJournal, but this is using
|
||||
// AsyncWriteProxy message protocol
|
||||
|
|
|
|||
|
|
@ -0,0 +1,105 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.journal.leveldb
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
|
||||
class CompactionSegmentManagementSpec extends WordSpec {
|
||||
|
||||
"A CompactionSegmentManagement compatible object" must {
|
||||
"ignore persistence ids without declared compaction intervals" in {
|
||||
val intervals = Map(
|
||||
"persistence_id-1" → 1L,
|
||||
"persistence_id-2" → 1L,
|
||||
"persistence_id-3" → 1L
|
||||
)
|
||||
val compactionStub = new CompactionSegmentManagement {
|
||||
override def compactionIntervals: Map[String, Long] = intervals
|
||||
}
|
||||
assert(intervals.contains("persistence_id-1") && compactionStub.mustCompact("persistence_id-1", 1))
|
||||
assert(intervals.contains("persistence_id-2") && compactionStub.mustCompact("persistence_id-2", 1))
|
||||
assert(intervals.contains("persistence_id-3") && compactionStub.mustCompact("persistence_id-3", 1))
|
||||
assert(!intervals.contains("persistence_id-4") && !compactionStub.mustCompact("persistence_id-4", 1))
|
||||
}
|
||||
|
||||
"ignore persistence ids whose compaction intervals are less or equal to zero" in {
|
||||
val intervals = Map(
|
||||
"persistence_id-1" → 1L,
|
||||
"persistence_id-2" → 0L,
|
||||
"persistence_id-3" → -1L
|
||||
)
|
||||
val compactionStub = new CompactionSegmentManagement {
|
||||
override def compactionIntervals: Map[String, Long] = intervals
|
||||
}
|
||||
assert(intervals("persistence_id-1") > 0 && compactionStub.mustCompact("persistence_id-1", 1))
|
||||
assert(intervals("persistence_id-2") <= 0 && !compactionStub.mustCompact("persistence_id-2", 1))
|
||||
assert(intervals("persistence_id-3") <= 0 && !compactionStub.mustCompact("persistence_id-3", 1))
|
||||
}
|
||||
|
||||
"allow for wildcard configuration" in {
|
||||
val intervals = Map(
|
||||
"persistence_id-1" → 1L,
|
||||
"persistence_id-2" → 1L,
|
||||
"*" → 1L
|
||||
)
|
||||
val compactionStub = new CompactionSegmentManagement {
|
||||
override def compactionIntervals: Map[String, Long] = intervals
|
||||
}
|
||||
assert(intervals.contains("persistence_id-1") && compactionStub.mustCompact("persistence_id-1", 1))
|
||||
assert(intervals.contains("persistence_id-2") && compactionStub.mustCompact("persistence_id-2", 1))
|
||||
assert(!intervals.contains("persistence_id-3") && compactionStub.mustCompact("persistence_id-3", 1))
|
||||
}
|
||||
|
||||
"not permit compaction before thresholds are exceeded" in {
|
||||
val namedIntervals = Map("persistence_id-1" → 5L, "persistence_id-2" → 4L)
|
||||
val intervals = namedIntervals + ("*" → 3L)
|
||||
val compactionStub = new CompactionSegmentManagement {
|
||||
override def compactionIntervals: Map[String, Long] = intervals
|
||||
}
|
||||
val expectedIntervals = namedIntervals + ("persistence_id-3" → 3L, "persistence_id-4" → 3L)
|
||||
|
||||
for ((id, interval) ← expectedIntervals) {
|
||||
var segment = 0
|
||||
|
||||
for (i ← 0L.until(4 * interval)) {
|
||||
if ((i + 1) % interval == 0) {
|
||||
assert(compactionStub.mustCompact(id, i), s"must compact for [$id] when toSeqNr is [$i]")
|
||||
segment += 1
|
||||
compactionStub.updateCompactionSegment(id, segment)
|
||||
} else {
|
||||
assert(!compactionStub.mustCompact(id, i), s"must not compact for [$id] when toSeqNr is [$i]")
|
||||
}
|
||||
|
||||
assert(compactionStub.compactionSegment(id, i) == segment, s"for [$id] when toSeqNr is [$i]")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"keep track of latest segment with respect to the limit imposed by the upper limit of deletion" in {
|
||||
val id = "persistence_id-1"
|
||||
val interval = 5L
|
||||
val compactionStub = new CompactionSegmentManagement {
|
||||
override def compactionIntervals: Map[String, Long] = Map(id → interval)
|
||||
}
|
||||
val smallJump = interval / 2
|
||||
val midJump = interval
|
||||
val bigJump = interval * 2
|
||||
var toSeqNr = smallJump
|
||||
var segment = 0
|
||||
assert(!compactionStub.mustCompact(id, toSeqNr))
|
||||
assert(compactionStub.compactionSegment(id, toSeqNr) == segment)
|
||||
toSeqNr += midJump
|
||||
assert(compactionStub.mustCompact(id, toSeqNr))
|
||||
segment += 1
|
||||
assert(compactionStub.compactionSegment(id, toSeqNr) == segment)
|
||||
compactionStub.updateCompactionSegment(id, segment)
|
||||
toSeqNr += bigJump
|
||||
assert(compactionStub.mustCompact(id, toSeqNr))
|
||||
segment += 2
|
||||
assert(compactionStub.compactionSegment(id, toSeqNr) == segment)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,206 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.journal.leveldb
|
||||
|
||||
import java.io.File
|
||||
|
||||
import akka.actor.{ ActorLogging, ActorRef, ActorSystem, Props }
|
||||
import akka.persistence.journal.leveldb.JournalCompactionSpec.EventLogger._
|
||||
import akka.persistence.journal.leveldb.JournalCompactionSpec.SpecComponentBuilder
|
||||
import akka.persistence.{ DeleteMessagesSuccess, PersistenceSpec, PersistentActor }
|
||||
import akka.testkit.TestProbe
|
||||
import com.typesafe.config.Config
|
||||
import org.apache.commons.io.FileUtils
|
||||
|
||||
import scala.util.Random
|
||||
|
||||
class JournalNoCompactionSpec extends JournalCompactionSpecBase(SpecComponentBuilder("leveldb-JournalNoCompactionSpec")) {
|
||||
|
||||
"A LevelDB-based persistent actor" must {
|
||||
"NOT compact the journal if compaction is not activated by configuration" in {
|
||||
val watcher = TestProbe("watcher")
|
||||
val logger = createLogger(watcher.ref)
|
||||
val totalMessages = 2000
|
||||
val deletionBatchSize = 500
|
||||
var oldJournalSize = 0L
|
||||
|
||||
for (i ← 0L.until(totalMessages)) {
|
||||
logger ! Generate
|
||||
watcher.expectMsg(Generated(i))
|
||||
}
|
||||
|
||||
var newJournalSize = calculateJournalSize()
|
||||
assert(oldJournalSize < newJournalSize)
|
||||
|
||||
var deletionSeqNr = deletionBatchSize - 1
|
||||
|
||||
while (deletionSeqNr < totalMessages) {
|
||||
logger ! Delete(deletionSeqNr)
|
||||
watcher.expectMsg(DeleteMessagesSuccess(deletionSeqNr))
|
||||
|
||||
oldJournalSize = newJournalSize
|
||||
newJournalSize = calculateJournalSize()
|
||||
assert(oldJournalSize <= newJournalSize)
|
||||
|
||||
deletionSeqNr += deletionBatchSize
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class JournalCompactionSpec extends JournalCompactionSpecBase(SpecComponentBuilder("leveldb-JournalCompactionSpec", 500)) {
|
||||
|
||||
"A LevelDB-based persistent actor" must {
|
||||
"compact the journal upon message deletions of configured persistence ids" in {
|
||||
val watcher = TestProbe("watcher")
|
||||
val logger = createLogger(watcher.ref)
|
||||
val totalMessages = 4 * builder.compactionInterval // 2000
|
||||
val deletionBatchSize = builder.compactionInterval // 500
|
||||
var oldJournalSize = 0L
|
||||
|
||||
for (i ← 0L.until(totalMessages)) {
|
||||
logger ! Generate
|
||||
watcher.expectMsg(Generated(i))
|
||||
}
|
||||
|
||||
var newJournalSize = calculateJournalSize()
|
||||
assert(oldJournalSize < newJournalSize)
|
||||
|
||||
var deletionSeqNr = deletionBatchSize - 1
|
||||
|
||||
while (deletionSeqNr < totalMessages) {
|
||||
logger ! Delete(deletionSeqNr)
|
||||
watcher.expectMsg(DeleteMessagesSuccess(deletionSeqNr))
|
||||
|
||||
oldJournalSize = newJournalSize
|
||||
newJournalSize = calculateJournalSize()
|
||||
assert(oldJournalSize > newJournalSize)
|
||||
|
||||
deletionSeqNr += deletionBatchSize
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class JournalCompactionThresholdSpec extends JournalCompactionSpecBase(SpecComponentBuilder("leveldb-JournalCompactionThresholdSpec", 500)) {
|
||||
|
||||
"A LevelDB-based persistent actor" must {
|
||||
"compact the journal only after the threshold implied by the configured compaction interval has been exceeded" in {
|
||||
val watcher = TestProbe("watcher")
|
||||
val logger = createLogger(watcher.ref)
|
||||
val totalMessages = 2 * builder.compactionInterval // 1000
|
||||
val deletionBatchSize = builder.compactionInterval // 500
|
||||
var oldJournalSize = 0L
|
||||
|
||||
for (i ← 0L.until(totalMessages)) {
|
||||
logger ! Generate
|
||||
watcher.expectMsg(Generated(i))
|
||||
}
|
||||
|
||||
var newJournalSize = calculateJournalSize()
|
||||
assert(oldJournalSize < newJournalSize)
|
||||
|
||||
var deletionSeqNr = deletionBatchSize - 2
|
||||
|
||||
while (deletionSeqNr < totalMessages) {
|
||||
logger ! Delete(deletionSeqNr)
|
||||
watcher.expectMsg(DeleteMessagesSuccess(deletionSeqNr))
|
||||
|
||||
oldJournalSize = newJournalSize
|
||||
newJournalSize = calculateJournalSize()
|
||||
|
||||
if (deletionSeqNr < builder.compactionInterval - 1) {
|
||||
assert(oldJournalSize <= newJournalSize)
|
||||
} else {
|
||||
assert(oldJournalSize > newJournalSize)
|
||||
}
|
||||
|
||||
deletionSeqNr += deletionBatchSize
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
abstract class JournalCompactionSpecBase(val builder: SpecComponentBuilder) extends PersistenceSpec(builder.config) {
|
||||
|
||||
def createLogger(watcher: ActorRef): ActorRef = builder.createLogger(system, watcher)
|
||||
|
||||
def calculateJournalSize(): Long = FileUtils.sizeOfDirectory(journalDir)
|
||||
|
||||
def journalDir: File = {
|
||||
val relativePath = system.settings.config.getString("akka.persistence.journal.leveldb.dir")
|
||||
new File(relativePath).getAbsoluteFile
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object JournalCompactionSpec {
|
||||
|
||||
class SpecComponentBuilder(val specId: String, val compactionInterval: Long) {
|
||||
|
||||
def config: Config = {
|
||||
PersistenceSpec.config("leveldb", specId, extraConfig = Some(
|
||||
s"""
|
||||
| akka.persistence.journal.leveldb.compaction-intervals.$specId = $compactionInterval
|
||||
""".stripMargin
|
||||
))
|
||||
}
|
||||
|
||||
def createLogger(system: ActorSystem, watcher: ActorRef): ActorRef = {
|
||||
system.actorOf(EventLogger.props(specId, watcher), "logger")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object SpecComponentBuilder {
|
||||
|
||||
def apply(specId: String): SpecComponentBuilder = apply(specId, 0)
|
||||
|
||||
def apply(specId: String, compactionInterval: Long): SpecComponentBuilder = {
|
||||
new SpecComponentBuilder(specId, compactionInterval)
|
||||
}
|
||||
}
|
||||
|
||||
object EventLogger {
|
||||
|
||||
case object Generate
|
||||
|
||||
case class Generated(seqNr: Long)
|
||||
|
||||
case class Delete(toSeqNr: Long)
|
||||
|
||||
case class Event(seqNr: Long, payload: String)
|
||||
|
||||
def props(specId: String, watcher: ActorRef): Props = Props(classOf[EventLogger], specId, watcher)
|
||||
}
|
||||
|
||||
class EventLogger(specId: String, watcher: ActorRef) extends PersistentActor with ActorLogging {
|
||||
|
||||
import EventLogger._
|
||||
|
||||
override def receiveRecover: Receive = {
|
||||
case Event(seqNr, _) ⇒ log.info("Recovered event {}", seqNr)
|
||||
}
|
||||
|
||||
override def receiveCommand: Receive = {
|
||||
case Generate ⇒
|
||||
persist(Event(lastSequenceNr, randomText()))(onEventPersisted)
|
||||
case Delete(toSeqNr) ⇒
|
||||
log.info("Deleting messages up to {}", toSeqNr)
|
||||
deleteMessages(toSeqNr)
|
||||
case evt: DeleteMessagesSuccess ⇒
|
||||
watcher ! evt
|
||||
}
|
||||
|
||||
override def persistenceId: String = specId
|
||||
|
||||
private def onEventPersisted(evt: Event): Unit = {
|
||||
watcher ! Generated(evt.seqNr)
|
||||
}
|
||||
|
||||
private def randomText(): String = Random.nextString(1024)
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue