Merge pull request #18195 from akka/wip-17837-writerUid-patriknw

+per #17837 Detect (and repair) corrupt event streams
This commit is contained in:
Patrik Nordwall 2015-08-14 15:04:15 +02:00
commit 8652e37711
9 changed files with 566 additions and 94 deletions

View file

@ -386,6 +386,12 @@ have explicitly relied on this behaviour, however if you find yourself with an a
should rewrite it to explicitly store the ``ActorPath`` of where such replies during replay may have to be sent to,
instead of relying on the sender reference during replay.
max-message-batch-size config
-----------------------------
Configuration property ``akka.persistence.journal.max-message-batch-size`` has been moved into the plugin configuration
section, to allow different values for different journal plugins. See ``reference.conf``.
Persistence Plugin APIs
=======================

View file

@ -21,10 +21,6 @@ akka.persistence {
# Absolute path to the journal plugin configuration entry used by persistent actor or view by default.
# Persistent actor or view can override `journalPluginId` method in order to rely on a different journal plugin.
plugin = ""
# Maximum size of a persistent message batch written to the journal.
max-message-batch-size = 200
# Maximum size of a deletion batch written to the journal.
max-deletion-batch-size = 10000
}
# Default snapshot store settings.
snapshot-store {
@ -82,11 +78,65 @@ akka.persistence {
}
}
}
# CircuitBreaker setting if not defined in plugin config section named circuit-breaker
default-circuit-breaker {
max-failures = 10
call-timeout = 10s
reset-timeout = 30s
# Fallback settings for journal plugin configurations.
# These settings are used if they are not defined in plugin config section.
journal-plugin-fallback {
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"
# Set this to true if plugin actor has a constructor which expects plugin
# configuration entry.
inject-config = false
# Maximum size of a persistent message batch written to the journal.
max-message-batch-size = 200
circuit-breaker {
max-failures = 10
call-timeout = 10s
reset-timeout = 30s
}
# The replay filter can detect a corrupt event stream by inspecting
# sequence numbers and writerUuid when replaying events.
replay-filter {
# What the filter should do when detecting invalid events.
# Supported values:
# `repair-by-discard-old` : discard events from old writers,
# warning is logged
# `fail` : fail the replay, error is logged
# `warn` : log warning but emit events untouched
# `off` : disable this feature completely
mode = repair-by-discard-old
# It uses a look ahead buffer for analyzing the events.
# This defines the size (in number of events) of the buffer.
window-size = 100
# How many old writerUuid to remember
max-old-writers = 10
}
}
# Fallback settings for snapshot store plugin configurations
# These settings are used if they are not defined in plugin config section.
snapshot-store-plugin-fallback {
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
inject-config = false
circuit-breaker {
max-failures = 5
call-timeout = 20s
reset-timeout = 60s
}
}
}
@ -106,6 +156,7 @@ akka.actor {
}
}
###################################################
# Persistence plugins included with the extension #
###################################################
@ -132,12 +183,6 @@ akka.persistence.snapshot-store.local {
# yet older snapshot files are available. Each recovery attempt will try
# to recover using an older than previously failed-on snapshot file (if any are present).
max-load-attempts = 3
# CircuitBreaker settings
circuit-breaker {
max-failures = 5
call-timeout = 20s
reset-timeout = 60s
}
}
# LevelDB journal plugin.
@ -157,12 +202,6 @@ akka.persistence.journal.leveldb {
checksum = off
# Native LevelDB (via JNI) or LevelDB Java port.
native = on
# CircuitBreaker settings
circuit-breaker {
max-failures = 10
call-timeout = 10s
reset-timeout = 30s
}
}
# Shared LevelDB journal plugin (for testing only).

View file

@ -51,7 +51,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
private val writerUuid = UUID.randomUUID.toString
private var journalBatch = Vector.empty[PersistentEnvelope]
private val maxMessageBatchSize = extension.settings.journal.maxMessageBatchSize
private val maxMessageBatchSize = extension.journalConfigFor(journalPluginId).getInt("max-message-batch-size")
private var writeInProgress = false
private var sequenceNr: Long = 0L
private var _lastSequenceNr: Long = 0L

View file

@ -5,28 +5,20 @@
package akka.persistence
import java.util.concurrent.atomic.AtomicReference
import akka.actor._
import akka.dispatch.Dispatchers
import akka.event.{ Logging, LoggingAdapter }
import akka.persistence.journal.{ AsyncWriteJournal, EventAdapters, IdentityEventAdapters }
import akka.persistence.journal.{ AsyncWriteJournal, EventAdapters, IdentityEventAdapters, ReplayFilter }
import akka.util.Helpers.ConfigOps
import com.typesafe.config.Config
import scala.annotation.tailrec
import scala.concurrent.duration._
import java.util.Locale
/**
* Persistence configuration.
*/
final class PersistenceSettings(config: Config) {
object journal {
val maxMessageBatchSize: Int =
config.getInt("journal.max-message-batch-size")
val maxDeletionBatchSize: Int =
config.getInt("journal.max-deletion-batch-size")
}
object view {
val autoUpdate: Boolean =
@ -124,7 +116,8 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system)
def lookup() = Persistence
/** INTERNAL API. */
private[persistence] case class PluginHolder(actor: ActorRef, adapters: EventAdapters) extends Extension
private[persistence] case class PluginHolder(actor: ActorRef, adapters: EventAdapters, config: Config)
extends Extension
}
/**
@ -164,28 +157,23 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
/** Check for default or missing identity. */
private def isEmpty(text: String) = text == null || text.length == 0
/** Discovered persistence journal plugins. */
/** Discovered persistence journal and snapshot store plugins. */
private val journalPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)
/** Discovered persistence snapshot store plugins. */
private val snapshotPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)
private val journalFallbackConfigPath = "akka.persistence.journal-plugin-fallback"
private val snapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback"
/**
* Returns an [[akka.persistence.journal.EventAdapters]] object which serves as a per-journal collection of bound event adapters.
* If no adapters are registered for a given journal the EventAdapters object will simply return the identity
* adapter for each class, otherwise the most specific adapter matching a given class will be returned.
*/
@tailrec final def adaptersFor(journalPluginId: String): EventAdapters = {
final def adaptersFor(journalPluginId: String): EventAdapters = {
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
val extensionIdMap = journalPluginExtensionId.get
extensionIdMap.get(configPath) match {
case Some(extensionId)
extensionId(system).adapters
case None
val extensionId = new PluginHolderExtensionId(configPath)
journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
adaptersFor(journalPluginId) // Recursive invocation.
}
pluginHolderFor(configPath, journalFallbackConfigPath).adapters
}
/**
@ -202,22 +190,38 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
}
/**
* INTERNAL API
* Returns the plugin config identified by `pluginId`.
* When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path.
* When configured, uses `journalPluginId` as absolute path to the journal configuration entry.
*/
private[akka] final def journalConfigFor(journalPluginId: String): Config = {
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
pluginHolderFor(configPath, journalFallbackConfigPath).config
}
/**
* INTERNAL API
* Looks up the plugin config by plugin's ActorRef.
*/
private[akka] final def configFor(journalPluginActor: ActorRef): Config =
journalPluginExtensionId.get().values.collectFirst {
case ext if ext(system).actor == journalPluginActor ext(system).config
} match {
case Some(conf) conf
case None throw new IllegalArgumentException(s"Unknow plugin actor $journalPluginActor")
}
/**
* INTERNAL API
* Returns a journal plugin actor identified by `journalPluginId`.
* When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path.
* When configured, uses `journalPluginId` as absolute path to the journal configuration entry.
* Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`.
*/
@tailrec private[akka] final def journalFor(journalPluginId: String): ActorRef = {
private[akka] final def journalFor(journalPluginId: String): ActorRef = {
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
val extensionIdMap = journalPluginExtensionId.get
extensionIdMap.get(configPath) match {
case Some(extensionId)
extensionId(system).actor
case None
val extensionId = new PluginHolderExtensionId(configPath)
journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
journalFor(journalPluginId) // Recursive invocation.
}
pluginHolderFor(configPath, journalFallbackConfigPath).actor
}
/**
@ -228,29 +232,30 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
* When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry.
* Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`.
*/
@tailrec private[akka] final def snapshotStoreFor(snapshotPluginId: String): ActorRef = {
private[akka] final def snapshotStoreFor(snapshotPluginId: String): ActorRef = {
val configPath = if (isEmpty(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId
val extensionIdMap = snapshotPluginExtensionId.get
pluginHolderFor(configPath, snapshotStoreFallbackConfigPath).actor
}
@tailrec private def pluginHolderFor(configPath: String, fallbackPath: String): PluginHolder = {
val extensionIdMap = journalPluginExtensionId.get
extensionIdMap.get(configPath) match {
case Some(extensionId)
extensionId(system).actor
extensionId(system)
case None
val extensionId = new PluginHolderExtensionId(configPath)
snapshotPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
snapshotStoreFor(snapshotPluginId) // Recursive invocation.
val extensionId = new PluginHolderExtensionId(configPath, fallbackPath)
journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
pluginHolderFor(configPath, fallbackPath) // Recursive invocation.
}
}
private def createPlugin(configPath: String): ActorRef = {
require(!isEmpty(configPath) && system.settings.config.hasPath(configPath),
s"'reference.conf' is missing persistence plugin config path: '$configPath'")
private def createPlugin(configPath: String, pluginConfig: Config): ActorRef = {
val pluginActorName = configPath
val pluginConfig = system.settings.config.getConfig(configPath)
val pluginClassName = pluginConfig.getString("class")
log.debug(s"Create plugin: $pluginActorName $pluginClassName")
val pluginClass = system.dynamicAccess.getClassFor[Any](pluginClassName).get
val pluginInjectConfig = if (pluginConfig.hasPath("inject-config")) pluginConfig.getBoolean("inject-config") else false
val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else DefaultPluginDispatcherId
val pluginInjectConfig = pluginConfig.getBoolean("inject-config")
val pluginDispatcherId = pluginConfig.getString("plugin-dispatcher")
val pluginActorArgs = if (pluginInjectConfig) List(pluginConfig) else Nil
val pluginActorProps = Props(Deploy(dispatcher = pluginDispatcherId), pluginClass, pluginActorArgs)
system.systemActorOf(pluginActorProps, pluginActorName)
@ -266,11 +271,16 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
private def id(ref: ActorRef) = ref.path.toStringWithoutAddress
private class PluginHolderExtensionId(configPath: String) extends ExtensionId[PluginHolder] {
private class PluginHolderExtensionId(configPath: String, fallbackPath: String) extends ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
val plugin = createPlugin(configPath)
val adapters = createAdapters(configPath)
PluginHolder(plugin, adapters)
require(!isEmpty(configPath) && system.settings.config.hasPath(configPath),
s"'reference.conf' is missing persistence plugin config path: '$configPath'")
val config: Config = system.settings.config.getConfig(configPath)
.withFallback(system.settings.config.getConfig(fallbackPath))
val plugin: ActorRef = createPlugin(configPath, config)
val adapters: EventAdapters = createAdapters(configPath)
PluginHolder(plugin, adapters, config)
}
}

View file

@ -14,6 +14,7 @@ import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal
import akka.pattern.CircuitBreaker
import java.util.Locale
/**
* Abstract journal, optimized for asynchronous, non-blocking writes.
@ -25,23 +26,31 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
private val extension = Persistence(context.system)
private val publish = extension.settings.internal.publishPluginCommands
private val config = extension.configFor(self)
private val breaker = {
val maxFailures = config.getInt("circuit-breaker.max-failures")
val callTimeout = config.getDuration("circuit-breaker.call-timeout", MILLISECONDS).millis
val resetTimeout = config.getDuration("circuit-breaker.reset-timeout", MILLISECONDS).millis
CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout)
}
private val replayFilterMode: ReplayFilter.Mode =
config.getString("replay-filter.mode").toLowerCase(Locale.ROOT) match {
case "off" ReplayFilter.Disabled
case "repair-by-discard-old" ReplayFilter.RepairByDiscardOld
case "fail" ReplayFilter.Fail
case "warn" ReplayFilter.Warn
case other throw new IllegalArgumentException(
s"invalid replay-filter.mode [$other], supported values [off, repair, fail, warn]")
}
private def isReplayFilterEnabled: Boolean = replayFilterMode != ReplayFilter.Disabled
private val replayFilterWindowSize: Int = config.getInt("replay-filter.window-size")
private val replayFilterMaxOldWriters: Int = config.getInt("replay-filter.max-old-writers")
private val resequencer = context.actorOf(Props[Resequencer]())
private var resequencerCounter = 1L
private val breaker = {
val cfg = context.system.settings.config
val cbConfig =
if (cfg.hasPath(self.path.name + ".circuit-breaker"))
cfg.getConfig(self.path.name + ".circuit-breaker")
.withFallback(cfg.getConfig("akka.persistence.default-circuit-breaker"))
else cfg.getConfig("akka.persistence.default-circuit-breaker")
val maxFailures = cbConfig.getInt("max-failures")
val callTimeout = cbConfig.getDuration("call-timeout", MILLISECONDS).millis
val resetTimeout = cbConfig.getDuration("reset-timeout", MILLISECONDS).millis
CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout)
}
final def receive = receiveWriteJournal.orElse[Any, Unit](receivePluginInternal)
final val receiveWriteJournal: Actor.Receive = {
@ -110,6 +119,10 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
}
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor)
val replyTo =
if (isReplayFilterEnabled) context.actorOf(ReplayFilter.props(persistentActor, replayFilterMode,
replayFilterWindowSize, replayFilterMaxOldWriters))
else persistentActor
breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, fromSequenceNr))
.flatMap { highSeqNr
@ -123,7 +136,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p
if (!p.deleted) // old records from 2.3 may still have the deleted flag
adaptFromJournal(p).foreach { adaptedPersistentRepr
persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
replyTo.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
}
}.map(_ highSeqNr)
}
@ -131,7 +144,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
highSeqNr RecoverySuccess(highSeqNr)
}.recover {
case e ReplayMessagesFailure(e)
}.pipeTo(persistentActor).onSuccess {
}.pipeTo(replyTo).onSuccess {
case _ if publish context.system.eventStream.publish(r)
}

View file

@ -0,0 +1,155 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal
import akka.actor.ActorRef
import akka.actor.Actor
import akka.persistence.JournalProtocol
import java.util.LinkedList
import akka.actor.Props
import akka.actor.ActorLogging
import scala.collection.mutable.LinkedHashSet
/**
* INTERNAL API
*
* Detect corrupt event stream during replay. It uses the the writerUuid and the
* sequenceNr in the replayed events to find events emitted by overlapping writers.
*/
private[akka] object ReplayFilter {
def props(
persistentActor: ActorRef,
mode: Mode,
windowSize: Int,
maxOldWriters: Int): Props = {
require(windowSize > 0, "windowSize must be > 0")
require(maxOldWriters > 0, "maxOldWriters must be > 0")
require(mode != Disabled, "mode must not be Disabled")
Props(new ReplayFilter(persistentActor, mode, windowSize, maxOldWriters))
}
sealed trait Mode
case object Fail extends Mode
case object Warn extends Mode
case object RepairByDiscardOld extends Mode
case object Disabled extends Mode
}
/**
* INTERNAL API
*/
private[akka] class ReplayFilter(persistentActor: ActorRef, mode: ReplayFilter.Mode,
windowSize: Int, maxOldWriters: Int)
extends Actor with ActorLogging {
import JournalProtocol._
import ReplayFilter.{ Warn, Fail, RepairByDiscardOld, Disabled }
val buffer = new LinkedList[ReplayedMessage]()
val oldWriters = LinkedHashSet.empty[String]
var writerUuid = ""
var seqNo = -1L
def receive = {
case r @ ReplayedMessage(persistent)
try {
if (buffer.size == windowSize) {
val msg = buffer.removeFirst()
persistentActor.tell(msg, Actor.noSender)
}
if (r.persistent.writerUuid == writerUuid) {
// from same writer
if (r.persistent.sequenceNr < seqNo) {
val errMsg = s"Invalid replayed event [${r.persistent.sequenceNr}] in wrong order from " +
s"writer [${r.persistent.writerUuid}] with persistenceId [${r.persistent.persistenceId}]"
logIssue(errMsg)
mode match {
case RepairByDiscardOld // discard
case Fail throw new IllegalStateException(errMsg)
case Warn buffer.add(r)
case Disabled throw new IllegalArgumentException("mode must not be Disabled")
}
} else {
// note that it is alright with == seqNo, since such may be emitted EventSeq
buffer.add(r)
seqNo = r.persistent.sequenceNr
}
} else if (oldWriters.contains(r.persistent.writerUuid)) {
// from old writer
val errMsg = s"Invalid replayed event [${r.persistent.sequenceNr}] from old " +
s"writer [${r.persistent.writerUuid}] with persistenceId [${r.persistent.persistenceId}]"
logIssue(errMsg)
mode match {
case RepairByDiscardOld // discard
case Fail throw new IllegalStateException(errMsg)
case Warn buffer.add(r)
case Disabled throw new IllegalArgumentException("mode must not be Disabled")
}
} else {
// from new writer
if (writerUuid != "")
oldWriters.add(writerUuid)
if (oldWriters.size > maxOldWriters)
oldWriters.remove(oldWriters.head)
writerUuid = r.persistent.writerUuid
seqNo = r.persistent.sequenceNr
// clear the buffer from messages from other writers with higher seqNo
val iter = buffer.iterator()
while (iter.hasNext()) {
val msg = iter.next()
if (msg.persistent.sequenceNr >= seqNo) {
val errMsg = s"Invalid replayed event [${msg.persistent.sequenceNr}] in buffer from old " +
s"writer [${msg.persistent.writerUuid}] with persistenceId [${msg.persistent.persistenceId}]"
logIssue(errMsg)
mode match {
case RepairByDiscardOld iter.remove() // discard
case Fail throw new IllegalStateException(errMsg)
case Warn // keep
case Disabled throw new IllegalArgumentException("mode must not be Disabled")
}
}
}
buffer.add(r)
}
} catch {
case e: IllegalStateException if mode == Fail fail(e)
}
case msg @ (_: RecoverySuccess | _: ReplayMessagesFailure)
sendBuffered()
persistentActor.tell(msg, Actor.noSender)
context.stop(self)
}
def sendBuffered(): Unit = {
val iter = buffer.iterator()
while (iter.hasNext())
persistentActor.tell(iter.next(), Actor.noSender)
buffer.clear()
}
def logIssue(errMsg: String): Unit = mode match {
case Warn | RepairByDiscardOld log.warning(errMsg)
case Fail log.error(errMsg)
case Disabled throw new IllegalArgumentException("mode must not be Disabled")
}
def fail(cause: IllegalStateException): Unit = {
buffer.clear()
persistentActor.tell(ReplayMessagesFailure(cause), Actor.noSender)
context.become {
case _: ReplayedMessage // discard
case msg @ (_: RecoverySuccess | _: ReplayMessagesFailure)
context.stop(self)
}
}
}

View file

@ -23,15 +23,10 @@ trait SnapshotStore extends Actor with ActorLogging {
private val publish = extension.settings.internal.publishPluginCommands
private val breaker = {
val cfg = context.system.settings.config
val cbConfig =
if (cfg.hasPath(self.path.name + ".circuit-breaker"))
cfg.getConfig(self.path.name + ".circuit-breaker")
.withFallback(cfg.getConfig("akka.persistence.default-circuit-breaker"))
else cfg.getConfig("akka.persistence.default-circuit-breaker")
val maxFailures = cbConfig.getInt("max-failures")
val callTimeout = cbConfig.getDuration("call-timeout", MILLISECONDS).millis
val resetTimeout = cbConfig.getDuration("reset-timeout", MILLISECONDS).millis
val cfg = extension.configFor(self)
val maxFailures = cfg.getInt("circuit-breaker.max-failures")
val callTimeout = cfg.getDuration("circuit-breaker.call-timeout", MILLISECONDS).millis
val resetTimeout = cfg.getDuration("circuit-breaker.reset-timeout", MILLISECONDS).millis
CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout)
}

View file

@ -288,6 +288,34 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config(
expectMsg(List("a", "b", "err", "c", "d"))
}
"detect overlapping writers during replay" in {
val p1 = namedPersistentActor[Behavior1PersistentActor]
p1 ! Cmd("a")
p1 ! GetState
expectMsg(List("a-1", "a-2"))
// create another with same persistenceId
val p2 = namedPersistentActor[Behavior1PersistentActor]
p2 ! GetState
expectMsg(List("a-1", "a-2"))
// continue writing from the old writer
p1 ! Cmd("b")
p1 ! GetState
expectMsg(List("a-1", "a-2", "b-1", "b-2"))
p2 ! Cmd("c")
p2 ! GetState
expectMsg(List("a-1", "a-2", "c-1", "c-2"))
// Create yet another one with same persistenceId, b-1 and b-2 discarded during replay
EventFilter.warning(start = "Invalid replayed event", occurrences = 2) intercept {
val p3 = namedPersistentActor[Behavior1PersistentActor]
p3 ! GetState
expectMsg(List("a-1", "a-2", "c-1", "c-2"))
}
}
}
}

View file

@ -0,0 +1,226 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal
import scala.concurrent.duration._
import akka.actor._
import akka.testkit._
import akka.persistence.JournalProtocol
import akka.persistence.PersistentRepr
class ReplayFilterSpec extends AkkaSpec with ImplicitSender {
import JournalProtocol._
import ReplayFilter.{ Warn, Fail, RepairByDiscardOld }
val writerA = "writer-A"
val writerB = "writer-B"
val writerC = "writer-C"
val m1 = ReplayedMessage(PersistentRepr("a", 13, "p1", "", writerUuid = writerA))
val m2 = ReplayedMessage(PersistentRepr("b", 14, "p1", "", writerUuid = writerA))
val m3 = ReplayedMessage(PersistentRepr("c", 15, "p1", "", writerUuid = writerA))
val m4 = ReplayedMessage(PersistentRepr("d", 16, "p1", "", writerUuid = writerA))
val successMsg = RecoverySuccess(15)
"ReplayFilter in RepairByDiscardOld mode" must {
"pass on all replayed messages and then stop" in {
val filter = system.actorOf(ReplayFilter.props(
testActor, mode = RepairByDiscardOld, windowSize = 2, maxOldWriters = 10))
filter ! m1
filter ! m2
filter ! m3
filter ! successMsg
expectMsg(m1)
expectMsg(m2)
expectMsg(m3)
expectMsg(successMsg)
watch(filter)
expectTerminated(filter)
}
"pass on all replayed messages when switching writer" in {
val filter = system.actorOf(ReplayFilter.props(
testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10))
filter ! m1
filter ! m2
val m32 = m3.copy(persistent = m3.persistent.update(writerUuid = writerB))
filter ! m32
filter ! successMsg
expectMsg(m1)
expectMsg(m2)
expectMsg(m32)
expectMsg(successMsg)
}
"discard message with same seqNo from old overlapping writer" in {
val filter = system.actorOf(ReplayFilter.props(
testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10))
EventFilter.warning(start = "Invalid replayed event", occurrences = 1) intercept {
filter ! m1
filter ! m2
filter ! m3
val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB))
filter ! m3b // same seqNo as m3, but from writerB
filter ! successMsg
expectMsg(m1)
expectMsg(m2)
expectMsg(m3b) // discard m3, because same seqNo from new writer
expectMsg(successMsg)
}
}
"discard messages from old writer after switching writer" in {
val filter = system.actorOf(ReplayFilter.props(
testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10))
EventFilter.warning(start = "Invalid replayed event", occurrences = 2) intercept {
filter ! m1
filter ! m2
val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB))
filter ! m3b
filter ! m3
filter ! m4
filter ! successMsg
expectMsg(m1)
expectMsg(m2)
expectMsg(m3b)
// discard m3, m4
expectMsg(successMsg)
}
}
"discard messages from several old writers" in {
val filter = system.actorOf(ReplayFilter.props(
testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10))
EventFilter.warning(start = "Invalid replayed event", occurrences = 3) intercept {
filter ! m1
val m2b = m2.copy(persistent = m2.persistent.update(writerUuid = writerB))
filter ! m2b
val m3c = m3.copy(persistent = m3.persistent.update(writerUuid = writerC))
filter ! m3c
filter ! m2
filter ! m3
val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB))
filter ! m3b
val m4c = m4.copy(persistent = m4.persistent.update(writerUuid = writerC))
filter ! m4c
filter ! successMsg
expectMsg(m1)
expectMsg(m2b)
expectMsg(m3c)
// discard m2, m3, m3b
expectMsg(m4c)
expectMsg(successMsg)
}
}
}
"ReplayFilter in Fail mode" must {
"fail when message with same seqNo from old overlapping writer" in {
val filter = system.actorOf(ReplayFilter.props(
testActor, mode = Fail, windowSize = 100, maxOldWriters = 10))
EventFilter.error(start = "Invalid replayed event", occurrences = 1) intercept {
filter ! m1
filter ! m2
filter ! m3
val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB))
filter ! m3b // same seqNo as m3, but from writerB
filter ! successMsg
expectMsgType[ReplayMessagesFailure].cause.getClass should be(classOf[IllegalStateException])
}
}
"fail when messages from old writer after switching writer" in {
val filter = system.actorOf(ReplayFilter.props(
testActor, mode = Fail, windowSize = 100, maxOldWriters = 10))
EventFilter.error(start = "Invalid replayed event", occurrences = 1) intercept {
filter ! m1
filter ! m2
val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB))
filter ! m3b
filter ! m3
filter ! m4
filter ! successMsg
expectMsgType[ReplayMessagesFailure].cause.getClass should be(classOf[IllegalStateException])
}
}
}
"ReplayFilter in Warn mode" must {
"warn about message with same seqNo from old overlapping writer" in {
val filter = system.actorOf(ReplayFilter.props(
testActor, mode = Warn, windowSize = 100, maxOldWriters = 10))
EventFilter.warning(start = "Invalid replayed event", occurrences = 1) intercept {
filter ! m1
filter ! m2
filter ! m3
val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB))
filter ! m3b // same seqNo as m3, but from writerB
filter ! successMsg
expectMsg(m1)
expectMsg(m2)
expectMsg(m3)
expectMsg(m3b)
expectMsg(successMsg)
}
}
"warn about messages from old writer after switching writer" in {
val filter = system.actorOf(ReplayFilter.props(
testActor, mode = Warn, windowSize = 100, maxOldWriters = 10))
EventFilter.warning(start = "Invalid replayed event", occurrences = 2) intercept {
filter ! m1
filter ! m2
val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB))
filter ! m3b
filter ! m3
filter ! m4
filter ! successMsg
expectMsg(m1)
expectMsg(m2)
expectMsg(m3b)
expectMsg(m3)
expectMsg(m4)
expectMsg(successMsg)
}
}
"warn about messages from several old writers" in {
val filter = system.actorOf(ReplayFilter.props(
testActor, mode = Warn, windowSize = 100, maxOldWriters = 10))
EventFilter.warning(start = "Invalid replayed event", occurrences = 3) intercept {
filter ! m1
val m2b = m2.copy(persistent = m2.persistent.update(writerUuid = writerB))
filter ! m2b
val m3c = m3.copy(persistent = m3.persistent.update(writerUuid = writerC))
filter ! m3c
filter ! m2
filter ! m3
val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB))
filter ! m3b
val m4c = m4.copy(persistent = m4.persistent.update(writerUuid = writerC))
filter ! m4c
filter ! successMsg
expectMsg(m1)
expectMsg(m2b)
expectMsg(m3c)
expectMsg(m2)
expectMsg(m3)
expectMsg(m3b)
expectMsg(m4c)
expectMsg(successMsg)
}
}
}
}