diff --git a/akka-docs/src/main/paradox/scala/persistence.md b/akka-docs/src/main/paradox/scala/persistence.md index d7bbd3827d..f18f1f09fc 100644 --- a/akka-docs/src/main/paradox/scala/persistence.md +++ b/akka-docs/src/main/paradox/scala/persistence.md @@ -1271,6 +1271,13 @@ directory. This location can be changed by configuration where the specified pat With this plugin, each actor system runs its own private LevelDB instance. +One peculiarity of LevelDB is that the deletion operation does not remove messages from the journal, but adds +a "tombstone" for each deleted message instead. In the case of heavy journal usage, especially one including frequent +deletes, this may be an issue as users may find themselves dealing with continuously increasing journal sizes. To +this end, LevelDB offers a special journal compaction function that is exposed via the following configuration: + +@@snip [PersistencePluginDocSpec.scala]($code$/scala/docs/persistence/PersistencePluginDocSpec.scala) { #compaction-intervals-config } + ### Shared LevelDB journal diff --git a/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala index b42043d3bc..56a8a517f0 100644 --- a/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala @@ -47,6 +47,18 @@ object PersistencePluginDocSpec { //#native-config akka.persistence.journal.leveldb.native = off //#native-config + + //#compaction-intervals-config + # Number of deleted messages per persistence id that will trigger journal compaction + akka.persistence.journal.leveldb.compaction-intervals { + persistence-id-1 = 100 + persistence-id-2 = 200 + # ... + persistence-id-N = 1000 + # use wildcards to match unspecified persistence ids, if any + "*" = 250 + } + //#compaction-intervals-config """ } diff --git a/akka-persistence/src/main/mima-filters/2.5.4.backwards.excludes b/akka-persistence/src/main/mima-filters/2.5.4.backwards.excludes new file mode 100644 index 0000000000..820524fa05 --- /dev/null +++ b/akka-persistence/src/main/mima-filters/2.5.4.backwards.excludes @@ -0,0 +1,14 @@ +# LevelDB journal compaction #21677 +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.journal.leveldb.LeveldbStore.compactionIntervals") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.journal.leveldb.LeveldbStore.akka$persistence$journal$leveldb$LeveldbStore$_setter_$compactionIntervals_=") +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.compactionLimit") +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.compactionSegment") +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.akka$persistence$journal$leveldb$CompactionSegmentManagement$$latestCompactionSegments") +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.compactionIntervals") +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.updateCompactionSegment") +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.akka$persistence$journal$leveldb$CompactionSegmentManagement$$latestCompactionSegments_=") +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.CompactionSegmentManagement.mustCompact") +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.actor.ActorLogging.akka$actor$ActorLogging$$_log_=") +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.actor.ActorLogging.log") +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.actor.ActorLogging.akka$actor$ActorLogging$$_log") +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.persistence.journal.leveldb.LeveldbCompaction.receiveCompactionInternal") diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 7fdd0bb6b2..25e629903d 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -244,6 +244,9 @@ akka.persistence.journal.leveldb { checksum = off # Native LevelDB (via JNI) or LevelDB Java port. native = on + # Number of deleted messages per persistence id that will trigger journal compaction + compaction-intervals { + } } # Shared LevelDB journal plugin (for testing only). @@ -268,6 +271,9 @@ akka.persistence.journal.leveldb-shared { checksum = off # Native LevelDB (via JNI) or LevelDB Java port. native = on + # Number of deleted messages per persistence id that will trigger journal compaction + compaction-intervals { + } } } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbCompaction.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbCompaction.scala new file mode 100644 index 0000000000..9652548080 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbCompaction.scala @@ -0,0 +1,97 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package akka.persistence.journal.leveldb + +import akka.actor.{ Actor, ActorLogging } + +private[persistence] object LeveldbCompaction { + + case class TryCompactLeveldb(persistenceId: String, toSeqNr: Long) +} + +/** + * INTERNAL API. + * + * Exposure of LevelDB compaction capability to reduce journal size upon message deletions. + */ +private[persistence] trait LeveldbCompaction extends Actor with ActorLogging with CompactionSegmentManagement { + this: LeveldbStore ⇒ + + import Key._ + import LeveldbCompaction._ + + def receiveCompactionInternal: Receive = { + case TryCompactLeveldb(persistenceId, toSeqNr) ⇒ + tryCompactOnDelete(persistenceId, toSeqNr) + } + + private def tryCompactOnDelete(persistenceId: String, toSeqNr: Long): Unit = { + if (mustCompact(persistenceId, toSeqNr)) { + val limit = compactionLimit(persistenceId, toSeqNr) + log.info("Starting compaction for persistence id [{}] up to sequence number [{}]", persistenceId, limit) + val start = keyToBytes(Key(numericId(persistenceId), 0, 0)) + val end = keyToBytes(Key(numericId(persistenceId), limit, 0)) + leveldb.compactRange(start, end) + updateCompactionSegment(persistenceId, compactionSegment(persistenceId, limit)) + log.info("Compaction for persistence id [{}] up to sequence number [{}] is complete", persistenceId, limit) + } else { + log.debug("No compaction required yet for persistence id [{}] up to sequence number [{}]", persistenceId, toSeqNr) + } + } +} + +/** + * INTERNAL API. + * + * Calculates and stores info of compacted "segments" per persistence id. + * + * Assuming a compaction interval N for a given persistence id, then compaction is to be performed + * for segments of message sequence numbers according to the following pattern: + * + * [0, N), [N, 2*N), ... , [m*N, (m + 1)*N) + * + * Once deletion is performed up to a sequence number 'toSeqNr', then 'toSeqNr' will be used to determine the + * rightmost segment up to which compaction will be performed. Eligible segments for compaction are only + * considered to be those which include sequence numbers up to 'toSeqNr' AND whose size is equal to N (the compaction + * interval). This rule implies that if 'toSeqNr' spans an incomplete portion of a rightmost segment, then + * that segment will be omitted from the pending compaction, and will be included into the next one. + * + */ +private[persistence] trait CompactionSegmentManagement { + + import CompactionSegmentManagement._ + + private[this] var latestCompactionSegments = Map.empty[String, Long] + + def compactionIntervals: Map[String, Long] + + def updateCompactionSegment(persistenceId: String, compactionSegment: Long): Unit = { + latestCompactionSegments += persistenceId → compactionSegment + } + + def compactionLimit(persistenceId: String, toSeqNr: Long): Long = { + val interval = compactionInterval(persistenceId) + (toSeqNr + 1) / interval * interval - 1 + } + + def compactionSegment(persistenceId: String, toSeqNr: Long): Long = (toSeqNr + 1) / compactionInterval(persistenceId) + + def mustCompact(persistenceId: String, toSeqNr: Long): Boolean = + isCompactionEnabled(persistenceId) && isCompactionRequired(persistenceId, toSeqNr) + + private def isCompactionEnabled(persistenceId: String): Boolean = compactionInterval(persistenceId) > 0L + + private def isCompactionRequired(persistenceId: String, toSeqNr: Long): Boolean = + compactionSegment(persistenceId, toSeqNr) > latestCompactionSegment(persistenceId) + + private def latestCompactionSegment(persistenceId: String): Long = latestCompactionSegments.getOrElse(persistenceId, 0L) + + private def compactionInterval(persistenceId: String): Long = + compactionIntervals.getOrElse(persistenceId, compactionIntervals.getOrElse(Wildcard, 0L)) +} + +private[persistence] object CompactionSegmentManagement { + val Wildcard = "*" +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index cca088d245..14ef9528a1 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -29,7 +29,7 @@ private[persistence] class LeveldbJournal(cfg: Config) extends AsyncWriteJournal if (cfg ne LeveldbStore.emptyConfig) cfg else context.system.settings.config.getConfig("akka.persistence.journal.leveldb") - override def receivePluginInternal: Receive = { + override def receivePluginInternal: Receive = receiveCompactionInternal orElse { case r @ ReplayTaggedMessages(fromSequenceNr, toSequenceNr, max, tag, replyTo) ⇒ import context.dispatcher val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index 4c954d65e4..900a6eb81e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -15,20 +15,25 @@ import akka.serialization.SerializationExtension import org.iq80.leveldb._ import scala.collection.immutable +import scala.collection.JavaConverters._ import scala.util._ import scala.concurrent.Future import scala.util.control.NonFatal import akka.persistence.journal.Tagged -import com.typesafe.config.{ Config, ConfigFactory } +import com.typesafe.config.{ Config, ConfigFactory, ConfigObject } private[persistence] object LeveldbStore { val emptyConfig = ConfigFactory.empty() + + def toCompactionIntervalMap(obj: ConfigObject): Map[String, Long] = { + obj.unwrapped().asScala.map(entry ⇒ (entry._1, java.lang.Long.parseLong(entry._2.toString))).toMap + } } /** * INTERNAL API. */ -private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with LeveldbIdMapping with LeveldbRecovery { +private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with LeveldbIdMapping with LeveldbRecovery with LeveldbCompaction { def prepareConfig: Config @@ -40,6 +45,7 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with val leveldbWriteOptions = new WriteOptions().sync(config.getBoolean("fsync")).snapshot(false) val leveldbDir = new File(config.getString("dir")) var leveldb: DB = _ + override val compactionIntervals: Map[String, Long] = LeveldbStore.toCompactionIntervalMap(config.getObject("compaction-intervals")) private val persistenceIdSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef] private val tagSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef] @@ -112,6 +118,8 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) sequenceNr += 1 } + + self ! LeveldbCompaction.TryCompactLeveldb(persistenceId, toSeqNr) } } } catch { diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala index 3e8714061f..b4ba3dbdbb 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala @@ -28,7 +28,7 @@ class SharedLeveldbStore(cfg: Config) extends LeveldbStore { if (cfg ne LeveldbStore.emptyConfig) cfg.getConfig("store") else context.system.settings.config.getConfig("akka.persistence.journal.leveldb-shared.store") - def receive = { + def receive = receiveCompactionInternal orElse { case WriteMessages(messages) ⇒ // TODO it would be nice to DRY this with AsyncWriteJournal, but this is using // AsyncWriteProxy message protocol diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/CompactionSegmentManagementSpec.scala b/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/CompactionSegmentManagementSpec.scala new file mode 100644 index 0000000000..ab45debcc2 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/CompactionSegmentManagementSpec.scala @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package akka.persistence.journal.leveldb + +import org.scalatest.WordSpec + +class CompactionSegmentManagementSpec extends WordSpec { + + "A CompactionSegmentManagement compatible object" must { + "ignore persistence ids without declared compaction intervals" in { + val intervals = Map( + "persistence_id-1" → 1L, + "persistence_id-2" → 1L, + "persistence_id-3" → 1L + ) + val compactionStub = new CompactionSegmentManagement { + override def compactionIntervals: Map[String, Long] = intervals + } + assert(intervals.contains("persistence_id-1") && compactionStub.mustCompact("persistence_id-1", 1)) + assert(intervals.contains("persistence_id-2") && compactionStub.mustCompact("persistence_id-2", 1)) + assert(intervals.contains("persistence_id-3") && compactionStub.mustCompact("persistence_id-3", 1)) + assert(!intervals.contains("persistence_id-4") && !compactionStub.mustCompact("persistence_id-4", 1)) + } + + "ignore persistence ids whose compaction intervals are less or equal to zero" in { + val intervals = Map( + "persistence_id-1" → 1L, + "persistence_id-2" → 0L, + "persistence_id-3" → -1L + ) + val compactionStub = new CompactionSegmentManagement { + override def compactionIntervals: Map[String, Long] = intervals + } + assert(intervals("persistence_id-1") > 0 && compactionStub.mustCompact("persistence_id-1", 1)) + assert(intervals("persistence_id-2") <= 0 && !compactionStub.mustCompact("persistence_id-2", 1)) + assert(intervals("persistence_id-3") <= 0 && !compactionStub.mustCompact("persistence_id-3", 1)) + } + + "allow for wildcard configuration" in { + val intervals = Map( + "persistence_id-1" → 1L, + "persistence_id-2" → 1L, + "*" → 1L + ) + val compactionStub = new CompactionSegmentManagement { + override def compactionIntervals: Map[String, Long] = intervals + } + assert(intervals.contains("persistence_id-1") && compactionStub.mustCompact("persistence_id-1", 1)) + assert(intervals.contains("persistence_id-2") && compactionStub.mustCompact("persistence_id-2", 1)) + assert(!intervals.contains("persistence_id-3") && compactionStub.mustCompact("persistence_id-3", 1)) + } + + "not permit compaction before thresholds are exceeded" in { + val namedIntervals = Map("persistence_id-1" → 5L, "persistence_id-2" → 4L) + val intervals = namedIntervals + ("*" → 3L) + val compactionStub = new CompactionSegmentManagement { + override def compactionIntervals: Map[String, Long] = intervals + } + val expectedIntervals = namedIntervals + ("persistence_id-3" → 3L, "persistence_id-4" → 3L) + + for ((id, interval) ← expectedIntervals) { + var segment = 0 + + for (i ← 0L.until(4 * interval)) { + if ((i + 1) % interval == 0) { + assert(compactionStub.mustCompact(id, i), s"must compact for [$id] when toSeqNr is [$i]") + segment += 1 + compactionStub.updateCompactionSegment(id, segment) + } else { + assert(!compactionStub.mustCompact(id, i), s"must not compact for [$id] when toSeqNr is [$i]") + } + + assert(compactionStub.compactionSegment(id, i) == segment, s"for [$id] when toSeqNr is [$i]") + } + } + } + + "keep track of latest segment with respect to the limit imposed by the upper limit of deletion" in { + val id = "persistence_id-1" + val interval = 5L + val compactionStub = new CompactionSegmentManagement { + override def compactionIntervals: Map[String, Long] = Map(id → interval) + } + val smallJump = interval / 2 + val midJump = interval + val bigJump = interval * 2 + var toSeqNr = smallJump + var segment = 0 + assert(!compactionStub.mustCompact(id, toSeqNr)) + assert(compactionStub.compactionSegment(id, toSeqNr) == segment) + toSeqNr += midJump + assert(compactionStub.mustCompact(id, toSeqNr)) + segment += 1 + assert(compactionStub.compactionSegment(id, toSeqNr) == segment) + compactionStub.updateCompactionSegment(id, segment) + toSeqNr += bigJump + assert(compactionStub.mustCompact(id, toSeqNr)) + segment += 2 + assert(compactionStub.compactionSegment(id, toSeqNr) == segment) + } + } + +} diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/JournalCompactionSpec.scala b/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/JournalCompactionSpec.scala new file mode 100644 index 0000000000..f17bc5d039 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/JournalCompactionSpec.scala @@ -0,0 +1,206 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package akka.persistence.journal.leveldb + +import java.io.File + +import akka.actor.{ ActorLogging, ActorRef, ActorSystem, Props } +import akka.persistence.journal.leveldb.JournalCompactionSpec.EventLogger._ +import akka.persistence.journal.leveldb.JournalCompactionSpec.SpecComponentBuilder +import akka.persistence.{ DeleteMessagesSuccess, PersistenceSpec, PersistentActor } +import akka.testkit.TestProbe +import com.typesafe.config.Config +import org.apache.commons.io.FileUtils + +import scala.util.Random + +class JournalNoCompactionSpec extends JournalCompactionSpecBase(SpecComponentBuilder("leveldb-JournalNoCompactionSpec")) { + + "A LevelDB-based persistent actor" must { + "NOT compact the journal if compaction is not activated by configuration" in { + val watcher = TestProbe("watcher") + val logger = createLogger(watcher.ref) + val totalMessages = 2000 + val deletionBatchSize = 500 + var oldJournalSize = 0L + + for (i ← 0L.until(totalMessages)) { + logger ! Generate + watcher.expectMsg(Generated(i)) + } + + var newJournalSize = calculateJournalSize() + assert(oldJournalSize < newJournalSize) + + var deletionSeqNr = deletionBatchSize - 1 + + while (deletionSeqNr < totalMessages) { + logger ! Delete(deletionSeqNr) + watcher.expectMsg(DeleteMessagesSuccess(deletionSeqNr)) + + oldJournalSize = newJournalSize + newJournalSize = calculateJournalSize() + assert(oldJournalSize <= newJournalSize) + + deletionSeqNr += deletionBatchSize + } + } + } +} + +class JournalCompactionSpec extends JournalCompactionSpecBase(SpecComponentBuilder("leveldb-JournalCompactionSpec", 500)) { + + "A LevelDB-based persistent actor" must { + "compact the journal upon message deletions of configured persistence ids" in { + val watcher = TestProbe("watcher") + val logger = createLogger(watcher.ref) + val totalMessages = 4 * builder.compactionInterval // 2000 + val deletionBatchSize = builder.compactionInterval // 500 + var oldJournalSize = 0L + + for (i ← 0L.until(totalMessages)) { + logger ! Generate + watcher.expectMsg(Generated(i)) + } + + var newJournalSize = calculateJournalSize() + assert(oldJournalSize < newJournalSize) + + var deletionSeqNr = deletionBatchSize - 1 + + while (deletionSeqNr < totalMessages) { + logger ! Delete(deletionSeqNr) + watcher.expectMsg(DeleteMessagesSuccess(deletionSeqNr)) + + oldJournalSize = newJournalSize + newJournalSize = calculateJournalSize() + assert(oldJournalSize > newJournalSize) + + deletionSeqNr += deletionBatchSize + } + } + } +} + +class JournalCompactionThresholdSpec extends JournalCompactionSpecBase(SpecComponentBuilder("leveldb-JournalCompactionThresholdSpec", 500)) { + + "A LevelDB-based persistent actor" must { + "compact the journal only after the threshold implied by the configured compaction interval has been exceeded" in { + val watcher = TestProbe("watcher") + val logger = createLogger(watcher.ref) + val totalMessages = 2 * builder.compactionInterval // 1000 + val deletionBatchSize = builder.compactionInterval // 500 + var oldJournalSize = 0L + + for (i ← 0L.until(totalMessages)) { + logger ! Generate + watcher.expectMsg(Generated(i)) + } + + var newJournalSize = calculateJournalSize() + assert(oldJournalSize < newJournalSize) + + var deletionSeqNr = deletionBatchSize - 2 + + while (deletionSeqNr < totalMessages) { + logger ! Delete(deletionSeqNr) + watcher.expectMsg(DeleteMessagesSuccess(deletionSeqNr)) + + oldJournalSize = newJournalSize + newJournalSize = calculateJournalSize() + + if (deletionSeqNr < builder.compactionInterval - 1) { + assert(oldJournalSize <= newJournalSize) + } else { + assert(oldJournalSize > newJournalSize) + } + + deletionSeqNr += deletionBatchSize + } + } + } +} + +abstract class JournalCompactionSpecBase(val builder: SpecComponentBuilder) extends PersistenceSpec(builder.config) { + + def createLogger(watcher: ActorRef): ActorRef = builder.createLogger(system, watcher) + + def calculateJournalSize(): Long = FileUtils.sizeOfDirectory(journalDir) + + def journalDir: File = { + val relativePath = system.settings.config.getString("akka.persistence.journal.leveldb.dir") + new File(relativePath).getAbsoluteFile + } + +} + +object JournalCompactionSpec { + + class SpecComponentBuilder(val specId: String, val compactionInterval: Long) { + + def config: Config = { + PersistenceSpec.config("leveldb", specId, extraConfig = Some( + s""" + | akka.persistence.journal.leveldb.compaction-intervals.$specId = $compactionInterval + """.stripMargin + )) + } + + def createLogger(system: ActorSystem, watcher: ActorRef): ActorRef = { + system.actorOf(EventLogger.props(specId, watcher), "logger") + } + + } + + object SpecComponentBuilder { + + def apply(specId: String): SpecComponentBuilder = apply(specId, 0) + + def apply(specId: String, compactionInterval: Long): SpecComponentBuilder = { + new SpecComponentBuilder(specId, compactionInterval) + } + } + + object EventLogger { + + case object Generate + + case class Generated(seqNr: Long) + + case class Delete(toSeqNr: Long) + + case class Event(seqNr: Long, payload: String) + + def props(specId: String, watcher: ActorRef): Props = Props(classOf[EventLogger], specId, watcher) + } + + class EventLogger(specId: String, watcher: ActorRef) extends PersistentActor with ActorLogging { + + import EventLogger._ + + override def receiveRecover: Receive = { + case Event(seqNr, _) ⇒ log.info("Recovered event {}", seqNr) + } + + override def receiveCommand: Receive = { + case Generate ⇒ + persist(Event(lastSequenceNr, randomText()))(onEventPersisted) + case Delete(toSeqNr) ⇒ + log.info("Deleting messages up to {}", toSeqNr) + deleteMessages(toSeqNr) + case evt: DeleteMessagesSuccess ⇒ + watcher ! evt + } + + override def persistenceId: String = specId + + private def onEventPersisted(evt: Event): Unit = { + watcher ! Generated(evt.seqNr) + } + + private def randomText(): String = Random.nextString(1024) + } + +}