Added journal compaction behavior for LevelDB #21677

This commit is contained in:
Nikos Nakas 2017-09-18 21:13:02 +00:00
parent bb106aa27e
commit 341b74e7c1
10 changed files with 459 additions and 4 deletions

View file

@ -1271,6 +1271,13 @@ directory. This location can be changed by configuration where the specified pat
With this plugin, each actor system runs its own private LevelDB instance.
One peculiarity of LevelDB is that the deletion operation does not remove messages from the journal, but adds
a "tombstone" for each deleted message instead. In the case of heavy journal usage, especially one including frequent
deletes, this may be an issue as users may find themselves dealing with continuously increasing journal sizes. To
this end, LevelDB offers a special journal compaction function that is exposed via the following configuration:
@@snip [PersistencePluginDocSpec.scala]($code$/scala/docs/persistence/PersistencePluginDocSpec.scala) { #compaction-intervals-config }
<a id="shared-leveldb-journal"></a>
### Shared LevelDB journal

View file

@ -47,6 +47,18 @@ object PersistencePluginDocSpec {
//#native-config
akka.persistence.journal.leveldb.native = off
//#native-config
//#compaction-intervals-config
# Number of deleted messages per persistence id that will trigger journal compaction
akka.persistence.journal.leveldb.compaction-intervals {
persistence-id-1 = 100
persistence-id-2 = 200
# ...
persistence-id-N = 1000
# use wildcards to match unspecified persistence ids, if any
"*" = 250
}
//#compaction-intervals-config
"""
}

View file

@ -0,0 +1,14 @@
# LevelDB journal compaction #21677
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.journal.leveldb.LeveldbStore.compactionIntervals")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.journal.leveldb.LeveldbStore.akka$persistence$journal$leveldb$LeveldbStore$_setter_$compactionIntervals_=")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.compactionLimit")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.compactionSegment")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.akka$persistence$journal$leveldb$CompactionSegmentManagement$$latestCompactionSegments")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.compactionIntervals")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.updateCompactionSegment")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.akka$persistence$journal$leveldb$CompactionSegmentManagement$$latestCompactionSegments_=")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.mustCompact")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.actor.ActorLogging.akka$actor$ActorLogging$$_log_=")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.actor.ActorLogging.log")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.actor.ActorLogging.akka$actor$ActorLogging$$_log")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.LeveldbCompaction.receiveCompactionInternal")

View file

@ -244,6 +244,9 @@ akka.persistence.journal.leveldb {
checksum = off
# Native LevelDB (via JNI) or LevelDB Java port.
native = on
# Number of deleted messages per persistence id that will trigger journal compaction
compaction-intervals {
}
}
# Shared LevelDB journal plugin (for testing only).
@ -268,6 +271,9 @@ akka.persistence.journal.leveldb-shared {
checksum = off
# Native LevelDB (via JNI) or LevelDB Java port.
native = on
# Number of deleted messages per persistence id that will trigger journal compaction
compaction-intervals {
}
}
}

View file

@ -0,0 +1,97 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.persistence.journal.leveldb
import akka.actor.{ Actor, ActorLogging }
private[persistence] object LeveldbCompaction {
case class TryCompactLeveldb(persistenceId: String, toSeqNr: Long)
}
/**
* INTERNAL API.
*
* Exposure of LevelDB compaction capability to reduce journal size upon message deletions.
*/
private[persistence] trait LeveldbCompaction extends Actor with ActorLogging with CompactionSegmentManagement {
this: LeveldbStore
import Key._
import LeveldbCompaction._
def receiveCompactionInternal: Receive = {
case TryCompactLeveldb(persistenceId, toSeqNr)
tryCompactOnDelete(persistenceId, toSeqNr)
}
private def tryCompactOnDelete(persistenceId: String, toSeqNr: Long): Unit = {
if (mustCompact(persistenceId, toSeqNr)) {
val limit = compactionLimit(persistenceId, toSeqNr)
log.info("Starting compaction for persistence id [{}] up to sequence number [{}]", persistenceId, limit)
val start = keyToBytes(Key(numericId(persistenceId), 0, 0))
val end = keyToBytes(Key(numericId(persistenceId), limit, 0))
leveldb.compactRange(start, end)
updateCompactionSegment(persistenceId, compactionSegment(persistenceId, limit))
log.info("Compaction for persistence id [{}] up to sequence number [{}] is complete", persistenceId, limit)
} else {
log.debug("No compaction required yet for persistence id [{}] up to sequence number [{}]", persistenceId, toSeqNr)
}
}
}
/**
* INTERNAL API.
*
* Calculates and stores info of compacted "segments" per persistence id.
*
* Assuming a compaction interval N for a given persistence id, then compaction is to be performed
* for segments of message sequence numbers according to the following pattern:
*
* [0, N), [N, 2*N), ... , [m*N, (m + 1)*N)
*
* Once deletion is performed up to a sequence number 'toSeqNr', then 'toSeqNr' will be used to determine the
* rightmost segment up to which compaction will be performed. Eligible segments for compaction are only
* considered to be those which include sequence numbers up to 'toSeqNr' AND whose size is equal to N (the compaction
* interval). This rule implies that if 'toSeqNr' spans an incomplete portion of a rightmost segment, then
* that segment will be omitted from the pending compaction, and will be included into the next one.
*
*/
private[persistence] trait CompactionSegmentManagement {
import CompactionSegmentManagement._
private[this] var latestCompactionSegments = Map.empty[String, Long]
def compactionIntervals: Map[String, Long]
def updateCompactionSegment(persistenceId: String, compactionSegment: Long): Unit = {
latestCompactionSegments += persistenceId compactionSegment
}
def compactionLimit(persistenceId: String, toSeqNr: Long): Long = {
val interval = compactionInterval(persistenceId)
(toSeqNr + 1) / interval * interval - 1
}
def compactionSegment(persistenceId: String, toSeqNr: Long): Long = (toSeqNr + 1) / compactionInterval(persistenceId)
def mustCompact(persistenceId: String, toSeqNr: Long): Boolean =
isCompactionEnabled(persistenceId) && isCompactionRequired(persistenceId, toSeqNr)
private def isCompactionEnabled(persistenceId: String): Boolean = compactionInterval(persistenceId) > 0L
private def isCompactionRequired(persistenceId: String, toSeqNr: Long): Boolean =
compactionSegment(persistenceId, toSeqNr) > latestCompactionSegment(persistenceId)
private def latestCompactionSegment(persistenceId: String): Long = latestCompactionSegments.getOrElse(persistenceId, 0L)
private def compactionInterval(persistenceId: String): Long =
compactionIntervals.getOrElse(persistenceId, compactionIntervals.getOrElse(Wildcard, 0L))
}
private[persistence] object CompactionSegmentManagement {
val Wildcard = "*"
}

View file

@ -29,7 +29,7 @@ private[persistence] class LeveldbJournal(cfg: Config) extends AsyncWriteJournal
if (cfg ne LeveldbStore.emptyConfig) cfg
else context.system.settings.config.getConfig("akka.persistence.journal.leveldb")
override def receivePluginInternal: Receive = {
override def receivePluginInternal: Receive = receiveCompactionInternal orElse {
case r @ ReplayTaggedMessages(fromSequenceNr, toSequenceNr, max, tag, replyTo)
import context.dispatcher
val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1)

View file

@ -15,20 +15,25 @@ import akka.serialization.SerializationExtension
import org.iq80.leveldb._
import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.util._
import scala.concurrent.Future
import scala.util.control.NonFatal
import akka.persistence.journal.Tagged
import com.typesafe.config.{ Config, ConfigFactory }
import com.typesafe.config.{ Config, ConfigFactory, ConfigObject }
private[persistence] object LeveldbStore {
val emptyConfig = ConfigFactory.empty()
def toCompactionIntervalMap(obj: ConfigObject): Map[String, Long] = {
obj.unwrapped().asScala.map(entry (entry._1, java.lang.Long.parseLong(entry._2.toString))).toMap
}
}
/**
* INTERNAL API.
*/
private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with LeveldbIdMapping with LeveldbRecovery {
private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with LeveldbIdMapping with LeveldbRecovery with LeveldbCompaction {
def prepareConfig: Config
@ -40,6 +45,7 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with
val leveldbWriteOptions = new WriteOptions().sync(config.getBoolean("fsync")).snapshot(false)
val leveldbDir = new File(config.getString("dir"))
var leveldb: DB = _
override val compactionIntervals: Map[String, Long] = LeveldbStore.toCompactionIntervalMap(config.getObject("compaction-intervals"))
private val persistenceIdSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef]
private val tagSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef]
@ -112,6 +118,8 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with
batch.delete(keyToBytes(Key(nid, sequenceNr, 0)))
sequenceNr += 1
}
self ! LeveldbCompaction.TryCompactLeveldb(persistenceId, toSeqNr)
}
}
} catch {

View file

@ -28,7 +28,7 @@ class SharedLeveldbStore(cfg: Config) extends LeveldbStore {
if (cfg ne LeveldbStore.emptyConfig) cfg.getConfig("store")
else context.system.settings.config.getConfig("akka.persistence.journal.leveldb-shared.store")
def receive = {
def receive = receiveCompactionInternal orElse {
case WriteMessages(messages)
// TODO it would be nice to DRY this with AsyncWriteJournal, but this is using
// AsyncWriteProxy message protocol

View file

@ -0,0 +1,105 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.persistence.journal.leveldb
import org.scalatest.WordSpec
class CompactionSegmentManagementSpec extends WordSpec {
"A CompactionSegmentManagement compatible object" must {
"ignore persistence ids without declared compaction intervals" in {
val intervals = Map(
"persistence_id-1" 1L,
"persistence_id-2" 1L,
"persistence_id-3" 1L
)
val compactionStub = new CompactionSegmentManagement {
override def compactionIntervals: Map[String, Long] = intervals
}
assert(intervals.contains("persistence_id-1") && compactionStub.mustCompact("persistence_id-1", 1))
assert(intervals.contains("persistence_id-2") && compactionStub.mustCompact("persistence_id-2", 1))
assert(intervals.contains("persistence_id-3") && compactionStub.mustCompact("persistence_id-3", 1))
assert(!intervals.contains("persistence_id-4") && !compactionStub.mustCompact("persistence_id-4", 1))
}
"ignore persistence ids whose compaction intervals are less or equal to zero" in {
val intervals = Map(
"persistence_id-1" 1L,
"persistence_id-2" 0L,
"persistence_id-3" -1L
)
val compactionStub = new CompactionSegmentManagement {
override def compactionIntervals: Map[String, Long] = intervals
}
assert(intervals("persistence_id-1") > 0 && compactionStub.mustCompact("persistence_id-1", 1))
assert(intervals("persistence_id-2") <= 0 && !compactionStub.mustCompact("persistence_id-2", 1))
assert(intervals("persistence_id-3") <= 0 && !compactionStub.mustCompact("persistence_id-3", 1))
}
"allow for wildcard configuration" in {
val intervals = Map(
"persistence_id-1" 1L,
"persistence_id-2" 1L,
"*" 1L
)
val compactionStub = new CompactionSegmentManagement {
override def compactionIntervals: Map[String, Long] = intervals
}
assert(intervals.contains("persistence_id-1") && compactionStub.mustCompact("persistence_id-1", 1))
assert(intervals.contains("persistence_id-2") && compactionStub.mustCompact("persistence_id-2", 1))
assert(!intervals.contains("persistence_id-3") && compactionStub.mustCompact("persistence_id-3", 1))
}
"not permit compaction before thresholds are exceeded" in {
val namedIntervals = Map("persistence_id-1" 5L, "persistence_id-2" 4L)
val intervals = namedIntervals + ("*" 3L)
val compactionStub = new CompactionSegmentManagement {
override def compactionIntervals: Map[String, Long] = intervals
}
val expectedIntervals = namedIntervals + ("persistence_id-3" 3L, "persistence_id-4" 3L)
for ((id, interval) expectedIntervals) {
var segment = 0
for (i 0L.until(4 * interval)) {
if ((i + 1) % interval == 0) {
assert(compactionStub.mustCompact(id, i), s"must compact for [$id] when toSeqNr is [$i]")
segment += 1
compactionStub.updateCompactionSegment(id, segment)
} else {
assert(!compactionStub.mustCompact(id, i), s"must not compact for [$id] when toSeqNr is [$i]")
}
assert(compactionStub.compactionSegment(id, i) == segment, s"for [$id] when toSeqNr is [$i]")
}
}
}
"keep track of latest segment with respect to the limit imposed by the upper limit of deletion" in {
val id = "persistence_id-1"
val interval = 5L
val compactionStub = new CompactionSegmentManagement {
override def compactionIntervals: Map[String, Long] = Map(id interval)
}
val smallJump = interval / 2
val midJump = interval
val bigJump = interval * 2
var toSeqNr = smallJump
var segment = 0
assert(!compactionStub.mustCompact(id, toSeqNr))
assert(compactionStub.compactionSegment(id, toSeqNr) == segment)
toSeqNr += midJump
assert(compactionStub.mustCompact(id, toSeqNr))
segment += 1
assert(compactionStub.compactionSegment(id, toSeqNr) == segment)
compactionStub.updateCompactionSegment(id, segment)
toSeqNr += bigJump
assert(compactionStub.mustCompact(id, toSeqNr))
segment += 2
assert(compactionStub.compactionSegment(id, toSeqNr) == segment)
}
}
}

View file

@ -0,0 +1,206 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.persistence.journal.leveldb
import java.io.File
import akka.actor.{ ActorLogging, ActorRef, ActorSystem, Props }
import akka.persistence.journal.leveldb.JournalCompactionSpec.EventLogger._
import akka.persistence.journal.leveldb.JournalCompactionSpec.SpecComponentBuilder
import akka.persistence.{ DeleteMessagesSuccess, PersistenceSpec, PersistentActor }
import akka.testkit.TestProbe
import com.typesafe.config.Config
import org.apache.commons.io.FileUtils
import scala.util.Random
class JournalNoCompactionSpec extends JournalCompactionSpecBase(SpecComponentBuilder("leveldb-JournalNoCompactionSpec")) {
"A LevelDB-based persistent actor" must {
"NOT compact the journal if compaction is not activated by configuration" in {
val watcher = TestProbe("watcher")
val logger = createLogger(watcher.ref)
val totalMessages = 2000
val deletionBatchSize = 500
var oldJournalSize = 0L
for (i 0L.until(totalMessages)) {
logger ! Generate
watcher.expectMsg(Generated(i))
}
var newJournalSize = calculateJournalSize()
assert(oldJournalSize < newJournalSize)
var deletionSeqNr = deletionBatchSize - 1
while (deletionSeqNr < totalMessages) {
logger ! Delete(deletionSeqNr)
watcher.expectMsg(DeleteMessagesSuccess(deletionSeqNr))
oldJournalSize = newJournalSize
newJournalSize = calculateJournalSize()
assert(oldJournalSize <= newJournalSize)
deletionSeqNr += deletionBatchSize
}
}
}
}
class JournalCompactionSpec extends JournalCompactionSpecBase(SpecComponentBuilder("leveldb-JournalCompactionSpec", 500)) {
"A LevelDB-based persistent actor" must {
"compact the journal upon message deletions of configured persistence ids" in {
val watcher = TestProbe("watcher")
val logger = createLogger(watcher.ref)
val totalMessages = 4 * builder.compactionInterval // 2000
val deletionBatchSize = builder.compactionInterval // 500
var oldJournalSize = 0L
for (i 0L.until(totalMessages)) {
logger ! Generate
watcher.expectMsg(Generated(i))
}
var newJournalSize = calculateJournalSize()
assert(oldJournalSize < newJournalSize)
var deletionSeqNr = deletionBatchSize - 1
while (deletionSeqNr < totalMessages) {
logger ! Delete(deletionSeqNr)
watcher.expectMsg(DeleteMessagesSuccess(deletionSeqNr))
oldJournalSize = newJournalSize
newJournalSize = calculateJournalSize()
assert(oldJournalSize > newJournalSize)
deletionSeqNr += deletionBatchSize
}
}
}
}
class JournalCompactionThresholdSpec extends JournalCompactionSpecBase(SpecComponentBuilder("leveldb-JournalCompactionThresholdSpec", 500)) {
"A LevelDB-based persistent actor" must {
"compact the journal only after the threshold implied by the configured compaction interval has been exceeded" in {
val watcher = TestProbe("watcher")
val logger = createLogger(watcher.ref)
val totalMessages = 2 * builder.compactionInterval // 1000
val deletionBatchSize = builder.compactionInterval // 500
var oldJournalSize = 0L
for (i 0L.until(totalMessages)) {
logger ! Generate
watcher.expectMsg(Generated(i))
}
var newJournalSize = calculateJournalSize()
assert(oldJournalSize < newJournalSize)
var deletionSeqNr = deletionBatchSize - 2
while (deletionSeqNr < totalMessages) {
logger ! Delete(deletionSeqNr)
watcher.expectMsg(DeleteMessagesSuccess(deletionSeqNr))
oldJournalSize = newJournalSize
newJournalSize = calculateJournalSize()
if (deletionSeqNr < builder.compactionInterval - 1) {
assert(oldJournalSize <= newJournalSize)
} else {
assert(oldJournalSize > newJournalSize)
}
deletionSeqNr += deletionBatchSize
}
}
}
}
abstract class JournalCompactionSpecBase(val builder: SpecComponentBuilder) extends PersistenceSpec(builder.config) {
def createLogger(watcher: ActorRef): ActorRef = builder.createLogger(system, watcher)
def calculateJournalSize(): Long = FileUtils.sizeOfDirectory(journalDir)
def journalDir: File = {
val relativePath = system.settings.config.getString("akka.persistence.journal.leveldb.dir")
new File(relativePath).getAbsoluteFile
}
}
object JournalCompactionSpec {
class SpecComponentBuilder(val specId: String, val compactionInterval: Long) {
def config: Config = {
PersistenceSpec.config("leveldb", specId, extraConfig = Some(
s"""
| akka.persistence.journal.leveldb.compaction-intervals.$specId = $compactionInterval
""".stripMargin
))
}
def createLogger(system: ActorSystem, watcher: ActorRef): ActorRef = {
system.actorOf(EventLogger.props(specId, watcher), "logger")
}
}
object SpecComponentBuilder {
def apply(specId: String): SpecComponentBuilder = apply(specId, 0)
def apply(specId: String, compactionInterval: Long): SpecComponentBuilder = {
new SpecComponentBuilder(specId, compactionInterval)
}
}
object EventLogger {
case object Generate
case class Generated(seqNr: Long)
case class Delete(toSeqNr: Long)
case class Event(seqNr: Long, payload: String)
def props(specId: String, watcher: ActorRef): Props = Props(classOf[EventLogger], specId, watcher)
}
class EventLogger(specId: String, watcher: ActorRef) extends PersistentActor with ActorLogging {
import EventLogger._
override def receiveRecover: Receive = {
case Event(seqNr, _) log.info("Recovered event {}", seqNr)
}
override def receiveCommand: Receive = {
case Generate
persist(Event(lastSequenceNr, randomText()))(onEventPersisted)
case Delete(toSeqNr)
log.info("Deleting messages up to {}", toSeqNr)
deleteMessages(toSeqNr)
case evt: DeleteMessagesSuccess
watcher ! evt
}
override def persistenceId: String = specId
private def onEventPersisted(evt: Event): Unit = {
watcher ! Generated(evt.seqNr)
}
private def randomText(): String = Random.nextString(1024)
}
}