From c5851600f0e1025dda3877fa934a852ab2020925 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 12 Aug 2015 13:32:30 +0200 Subject: [PATCH] +per #17837 Detect (and repair) corrupt event streams refactoring of plugin config --- .../project/migration-guide-2.3.x-2.4.x.rst | 6 + .../src/main/resources/reference.conf | 81 +++++-- .../scala/akka/persistence/Eventsourced.scala | 2 +- .../scala/akka/persistence/Persistence.scala | 106 ++++---- .../journal/AsyncWriteJournal.scala | 43 ++-- .../persistence/journal/ReplayFilter.scala | 155 ++++++++++++ .../persistence/snapshot/SnapshotStore.scala | 13 +- .../PersistentActorFailureSpec.scala | 28 +++ .../persistence/fsm/PersistentFSMSpec.scala | 14 +- .../journal/ReplayFilterSpec.scala | 226 ++++++++++++++++++ 10 files changed, 573 insertions(+), 101 deletions(-) create mode 100644 akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala create mode 100644 akka-persistence/src/test/scala/akka/persistence/journal/ReplayFilterSpec.scala diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 305e0a34bd..5c088e7b55 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -386,6 +386,12 @@ have explicitly relied on this behaviour, however if you find yourself with an a should rewrite it to explicitly store the ``ActorPath`` of where such replies during replay may have to be sent to, instead of relying on the sender reference during replay. +max-message-batch-size config +----------------------------- + +Configuration property ``akka.persistence.journal.max-message-batch-size`` has been moved into the plugin configuration +section, to allow different values for different journal plugins. See ``reference.conf``. + Persistence Plugin APIs ======================= diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 2c32a1aa1c..750515793a 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -21,10 +21,6 @@ akka.persistence { # Absolute path to the journal plugin configuration entry used by persistent actor or view by default. # Persistent actor or view can override `journalPluginId` method in order to rely on a different journal plugin. plugin = "" - # Maximum size of a persistent message batch written to the journal. - max-message-batch-size = 200 - # Maximum size of a deletion batch written to the journal. - max-deletion-batch-size = 10000 } # Default snapshot store settings. snapshot-store { @@ -82,11 +78,65 @@ akka.persistence { } } } - # CircuitBreaker setting if not defined in plugin config section named circuit-breaker - default-circuit-breaker { - max-failures = 10 - call-timeout = 10s - reset-timeout = 30s + + # Fallback settings for journal plugin configurations. + # These settings are used if they are not defined in plugin config section. + journal-plugin-fallback { + + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + + # Dispatcher for message replay. + replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" + + # Set this to true if plugin actor has a constructor which expects plugin + # configuration entry. + inject-config = false + + # Maximum size of a persistent message batch written to the journal. + max-message-batch-size = 200 + + circuit-breaker { + max-failures = 10 + call-timeout = 10s + reset-timeout = 30s + } + + # The replay filter can detect a corrupt event stream by inspecting + # sequence numbers and writerUuid when replaying events. + replay-filter { + # What the filter should do when detecting invalid events. + # Supported values: + # `repair-by-discard-old` : discard events from old writers, + # warning is logged + # `fail` : fail the replay, error is logged + # `warn` : log warning but emit events untouched + # `off` : disable this feature completely + mode = repair-by-discard-old + + # It uses a look ahead buffer for analyzing the events. + # This defines the size (in number of events) of the buffer. + window-size = 100 + + # How many old writerUuid to remember + max-old-writers = 10 + } + } + + # Fallback settings for snapshot store plugin configurations + # These settings are used if they are not defined in plugin config section. + snapshot-store-plugin-fallback { + + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + + inject-config = false + + circuit-breaker { + max-failures = 5 + call-timeout = 20s + reset-timeout = 60s + } } } @@ -106,6 +156,7 @@ akka.actor { } } + ################################################### # Persistence plugins included with the extension # ################################################### @@ -132,12 +183,6 @@ akka.persistence.snapshot-store.local { # yet older snapshot files are available. Each recovery attempt will try # to recover using an older than previously failed-on snapshot file (if any are present). max-load-attempts = 3 - # CircuitBreaker settings - circuit-breaker { - max-failures = 5 - call-timeout = 20s - reset-timeout = 60s - } } # LevelDB journal plugin. @@ -157,12 +202,6 @@ akka.persistence.journal.leveldb { checksum = off # Native LevelDB (via JNI) or LevelDB Java port. native = on - # CircuitBreaker settings - circuit-breaker { - max-failures = 10 - call-timeout = 10s - reset-timeout = 30s - } } # Shared LevelDB journal plugin (for testing only). diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 2117ad40fc..95a4a3cd0d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -51,7 +51,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas private val writerUuid = UUID.randomUUID.toString private var journalBatch = Vector.empty[PersistentEnvelope] - private val maxMessageBatchSize = extension.settings.journal.maxMessageBatchSize + private val maxMessageBatchSize = extension.journalConfigFor(journalPluginId).getInt("max-message-batch-size") private var writeInProgress = false private var sequenceNr: Long = 0L private var _lastSequenceNr: Long = 0L diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 3a383f75e5..1c468dedcb 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -5,28 +5,20 @@ package akka.persistence import java.util.concurrent.atomic.AtomicReference - import akka.actor._ import akka.dispatch.Dispatchers import akka.event.{ Logging, LoggingAdapter } -import akka.persistence.journal.{ AsyncWriteJournal, EventAdapters, IdentityEventAdapters } +import akka.persistence.journal.{ AsyncWriteJournal, EventAdapters, IdentityEventAdapters, ReplayFilter } import akka.util.Helpers.ConfigOps import com.typesafe.config.Config - import scala.annotation.tailrec import scala.concurrent.duration._ +import java.util.Locale /** * Persistence configuration. */ final class PersistenceSettings(config: Config) { - object journal { - val maxMessageBatchSize: Int = - config.getInt("journal.max-message-batch-size") - - val maxDeletionBatchSize: Int = - config.getInt("journal.max-deletion-batch-size") - } object view { val autoUpdate: Boolean = @@ -124,7 +116,8 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system) def lookup() = Persistence /** INTERNAL API. */ - private[persistence] case class PluginHolder(actor: ActorRef, adapters: EventAdapters) extends Extension + private[persistence] case class PluginHolder(actor: ActorRef, adapters: EventAdapters, config: Config) + extends Extension } /** @@ -164,28 +157,23 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { /** Check for default or missing identity. */ private def isEmpty(text: String) = text == null || text.length == 0 - /** Discovered persistence journal plugins. */ + /** Discovered persistence journal and snapshot store plugins. */ private val journalPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) /** Discovered persistence snapshot store plugins. */ private val snapshotPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) + private val journalFallbackConfigPath = "akka.persistence.journal-plugin-fallback" + private val snapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback" + /** * Returns an [[akka.persistence.journal.EventAdapters]] object which serves as a per-journal collection of bound event adapters. * If no adapters are registered for a given journal the EventAdapters object will simply return the identity * adapter for each class, otherwise the most specific adapter matching a given class will be returned. */ - @tailrec final def adaptersFor(journalPluginId: String): EventAdapters = { + final def adaptersFor(journalPluginId: String): EventAdapters = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId - val extensionIdMap = journalPluginExtensionId.get - extensionIdMap.get(configPath) match { - case Some(extensionId) ⇒ - extensionId(system).adapters - case None ⇒ - val extensionId = new PluginHolderExtensionId(configPath) - journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) - adaptersFor(journalPluginId) // Recursive invocation. - } + pluginHolderFor(configPath, journalFallbackConfigPath).adapters } /** @@ -202,22 +190,38 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { } /** + * INTERNAL API + * Returns the plugin config identified by `pluginId`. + * When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path. + * When configured, uses `journalPluginId` as absolute path to the journal configuration entry. + */ + private[akka] final def journalConfigFor(journalPluginId: String): Config = { + val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId + pluginHolderFor(configPath, journalFallbackConfigPath).config + } + + /** + * INTERNAL API + * Looks up the plugin config by plugin's ActorRef. + */ + private[akka] final def configFor(journalPluginActor: ActorRef): Config = + journalPluginExtensionId.get().values.collectFirst { + case ext if ext(system).actor == journalPluginActor ⇒ ext(system).config + } match { + case Some(conf) ⇒ conf + case None ⇒ throw new IllegalArgumentException(s"Unknow plugin actor $journalPluginActor") + } + + /** + * INTERNAL API * Returns a journal plugin actor identified by `journalPluginId`. * When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path. * When configured, uses `journalPluginId` as absolute path to the journal configuration entry. * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. */ - @tailrec private[akka] final def journalFor(journalPluginId: String): ActorRef = { + private[akka] final def journalFor(journalPluginId: String): ActorRef = { val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId - val extensionIdMap = journalPluginExtensionId.get - extensionIdMap.get(configPath) match { - case Some(extensionId) ⇒ - extensionId(system).actor - case None ⇒ - val extensionId = new PluginHolderExtensionId(configPath) - journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) - journalFor(journalPluginId) // Recursive invocation. - } + pluginHolderFor(configPath, journalFallbackConfigPath).actor } /** @@ -228,29 +232,30 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { * When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry. * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. */ - @tailrec private[akka] final def snapshotStoreFor(snapshotPluginId: String): ActorRef = { + private[akka] final def snapshotStoreFor(snapshotPluginId: String): ActorRef = { val configPath = if (isEmpty(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId - val extensionIdMap = snapshotPluginExtensionId.get + pluginHolderFor(configPath, snapshotStoreFallbackConfigPath).actor + } + + @tailrec private def pluginHolderFor(configPath: String, fallbackPath: String): PluginHolder = { + val extensionIdMap = journalPluginExtensionId.get extensionIdMap.get(configPath) match { case Some(extensionId) ⇒ - extensionId(system).actor + extensionId(system) case None ⇒ - val extensionId = new PluginHolderExtensionId(configPath) - snapshotPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) - snapshotStoreFor(snapshotPluginId) // Recursive invocation. + val extensionId = new PluginHolderExtensionId(configPath, fallbackPath) + journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) + pluginHolderFor(configPath, fallbackPath) // Recursive invocation. } } - private def createPlugin(configPath: String): ActorRef = { - require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), - s"'reference.conf' is missing persistence plugin config path: '$configPath'") + private def createPlugin(configPath: String, pluginConfig: Config): ActorRef = { val pluginActorName = configPath - val pluginConfig = system.settings.config.getConfig(configPath) val pluginClassName = pluginConfig.getString("class") log.debug(s"Create plugin: $pluginActorName $pluginClassName") val pluginClass = system.dynamicAccess.getClassFor[Any](pluginClassName).get - val pluginInjectConfig = if (pluginConfig.hasPath("inject-config")) pluginConfig.getBoolean("inject-config") else false - val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else DefaultPluginDispatcherId + val pluginInjectConfig = pluginConfig.getBoolean("inject-config") + val pluginDispatcherId = pluginConfig.getString("plugin-dispatcher") val pluginActorArgs = if (pluginInjectConfig) List(pluginConfig) else Nil val pluginActorProps = Props(Deploy(dispatcher = pluginDispatcherId), pluginClass, pluginActorArgs) system.systemActorOf(pluginActorProps, pluginActorName) @@ -266,11 +271,16 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private def id(ref: ActorRef) = ref.path.toStringWithoutAddress - private class PluginHolderExtensionId(configPath: String) extends ExtensionId[PluginHolder] { + private class PluginHolderExtensionId(configPath: String, fallbackPath: String) extends ExtensionId[PluginHolder] { override def createExtension(system: ExtendedActorSystem): PluginHolder = { - val plugin = createPlugin(configPath) - val adapters = createAdapters(configPath) - PluginHolder(plugin, adapters) + require(!isEmpty(configPath) && system.settings.config.hasPath(configPath), + s"'reference.conf' is missing persistence plugin config path: '$configPath'") + val config: Config = system.settings.config.getConfig(configPath) + .withFallback(system.settings.config.getConfig(fallbackPath)) + val plugin: ActorRef = createPlugin(configPath, config) + val adapters: EventAdapters = createAdapters(configPath) + + PluginHolder(plugin, adapters, config) } } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index c58c00c3c8..73a6e7e481 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -14,6 +14,7 @@ import scala.concurrent.Future import scala.util.{ Failure, Success, Try } import scala.util.control.NonFatal import akka.pattern.CircuitBreaker +import java.util.Locale /** * Abstract journal, optimized for asynchronous, non-blocking writes. @@ -25,23 +26,31 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { private val extension = Persistence(context.system) private val publish = extension.settings.internal.publishPluginCommands + private val config = extension.configFor(self) + + private val breaker = { + val maxFailures = config.getInt("circuit-breaker.max-failures") + val callTimeout = config.getDuration("circuit-breaker.call-timeout", MILLISECONDS).millis + val resetTimeout = config.getDuration("circuit-breaker.reset-timeout", MILLISECONDS).millis + CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout) + } + + private val replayFilterMode: ReplayFilter.Mode = + config.getString("replay-filter.mode").toLowerCase(Locale.ROOT) match { + case "off" ⇒ ReplayFilter.Disabled + case "repair-by-discard-old" ⇒ ReplayFilter.RepairByDiscardOld + case "fail" ⇒ ReplayFilter.Fail + case "warn" ⇒ ReplayFilter.Warn + case other ⇒ throw new IllegalArgumentException( + s"invalid replay-filter.mode [$other], supported values [off, repair, fail, warn]") + } + private def isReplayFilterEnabled: Boolean = replayFilterMode != ReplayFilter.Disabled + private val replayFilterWindowSize: Int = config.getInt("replay-filter.window-size") + private val replayFilterMaxOldWriters: Int = config.getInt("replay-filter.max-old-writers") private val resequencer = context.actorOf(Props[Resequencer]()) private var resequencerCounter = 1L - private val breaker = { - val cfg = context.system.settings.config - val cbConfig = - if (cfg.hasPath(self.path.name + ".circuit-breaker")) - cfg.getConfig(self.path.name + ".circuit-breaker") - .withFallback(cfg.getConfig("akka.persistence.default-circuit-breaker")) - else cfg.getConfig("akka.persistence.default-circuit-breaker") - val maxFailures = cbConfig.getInt("max-failures") - val callTimeout = cbConfig.getDuration("call-timeout", MILLISECONDS).millis - val resetTimeout = cbConfig.getDuration("reset-timeout", MILLISECONDS).millis - CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout) - } - final def receive = receiveWriteJournal.orElse[Any, Unit](receivePluginInternal) final val receiveWriteJournal: Actor.Receive = { @@ -110,6 +119,10 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) ⇒ + val replyTo = + if (isReplayFilterEnabled) context.actorOf(ReplayFilter.props(persistentActor, replayFilterMode, + replayFilterWindowSize, replayFilterMaxOldWriters)) + else persistentActor breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, fromSequenceNr)) .flatMap { highSeqNr ⇒ @@ -123,7 +136,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p ⇒ if (!p.deleted) // old records from 2.3 may still have the deleted flag adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒ - persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender) + replyTo.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender) } }.map(_ ⇒ highSeqNr) } @@ -131,7 +144,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { highSeqNr ⇒ RecoverySuccess(highSeqNr) }.recover { case e ⇒ ReplayMessagesFailure(e) - }.pipeTo(persistentActor).onSuccess { + }.pipeTo(replyTo).onSuccess { case _ if publish ⇒ context.system.eventStream.publish(r) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala b/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala new file mode 100644 index 0000000000..a3b8608b0a --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.journal + +import akka.actor.ActorRef +import akka.actor.Actor +import akka.persistence.JournalProtocol +import java.util.LinkedList +import akka.actor.Props +import akka.actor.ActorLogging +import scala.collection.mutable.LinkedHashSet + +/** + * INTERNAL API + * + * Detect corrupt event stream during replay. It uses the the writerUuid and the + * sequenceNr in the replayed events to find events emitted by overlapping writers. + */ +private[akka] object ReplayFilter { + def props( + persistentActor: ActorRef, + mode: Mode, + windowSize: Int, + maxOldWriters: Int): Props = { + require(windowSize > 0, "windowSize must be > 0") + require(maxOldWriters > 0, "maxOldWriters must be > 0") + require(mode != Disabled, "mode must not be Disabled") + Props(new ReplayFilter(persistentActor, mode, windowSize, maxOldWriters)) + } + + sealed trait Mode + case object Fail extends Mode + case object Warn extends Mode + case object RepairByDiscardOld extends Mode + case object Disabled extends Mode +} + +/** + * INTERNAL API + */ +private[akka] class ReplayFilter(persistentActor: ActorRef, mode: ReplayFilter.Mode, + windowSize: Int, maxOldWriters: Int) + extends Actor with ActorLogging { + import JournalProtocol._ + import ReplayFilter.{ Warn, Fail, RepairByDiscardOld, Disabled } + + val buffer = new LinkedList[ReplayedMessage]() + val oldWriters = LinkedHashSet.empty[String] + var writerUuid = "" + var seqNo = -1L + + def receive = { + case r @ ReplayedMessage(persistent) ⇒ + try { + if (buffer.size == windowSize) { + val msg = buffer.removeFirst() + persistentActor.tell(msg, Actor.noSender) + } + + if (r.persistent.writerUuid == writerUuid) { + // from same writer + if (r.persistent.sequenceNr < seqNo) { + val errMsg = s"Invalid replayed event [${r.persistent.sequenceNr}] in wrong order from " + + s"writer [${r.persistent.writerUuid}] with persistenceId [${r.persistent.persistenceId}]" + logIssue(errMsg) + mode match { + case RepairByDiscardOld ⇒ // discard + case Fail ⇒ throw new IllegalStateException(errMsg) + case Warn ⇒ buffer.add(r) + case Disabled ⇒ throw new IllegalArgumentException("mode must not be Disabled") + } + } else { + // note that it is alright with == seqNo, since such may be emitted EventSeq + buffer.add(r) + seqNo = r.persistent.sequenceNr + } + + } else if (oldWriters.contains(r.persistent.writerUuid)) { + // from old writer + val errMsg = s"Invalid replayed event [${r.persistent.sequenceNr}] from old " + + s"writer [${r.persistent.writerUuid}] with persistenceId [${r.persistent.persistenceId}]" + logIssue(errMsg) + mode match { + case RepairByDiscardOld ⇒ // discard + case Fail ⇒ throw new IllegalStateException(errMsg) + case Warn ⇒ buffer.add(r) + case Disabled ⇒ throw new IllegalArgumentException("mode must not be Disabled") + } + + } else { + // from new writer + if (writerUuid != "") + oldWriters.add(writerUuid) + if (oldWriters.size > maxOldWriters) + oldWriters.remove(oldWriters.head) + + writerUuid = r.persistent.writerUuid + seqNo = r.persistent.sequenceNr + + // clear the buffer from messages from other writers with higher seqNo + val iter = buffer.iterator() + while (iter.hasNext()) { + val msg = iter.next() + if (msg.persistent.sequenceNr >= seqNo) { + val errMsg = s"Invalid replayed event [${msg.persistent.sequenceNr}] in buffer from old " + + s"writer [${msg.persistent.writerUuid}] with persistenceId [${msg.persistent.persistenceId}]" + logIssue(errMsg) + mode match { + case RepairByDiscardOld ⇒ iter.remove() // discard + case Fail ⇒ throw new IllegalStateException(errMsg) + case Warn ⇒ // keep + case Disabled ⇒ throw new IllegalArgumentException("mode must not be Disabled") + } + + } + } + + buffer.add(r) + } + + } catch { + case e: IllegalStateException if mode == Fail ⇒ fail(e) + } + + case msg @ (_: RecoverySuccess | _: ReplayMessagesFailure) ⇒ + sendBuffered() + persistentActor.tell(msg, Actor.noSender) + context.stop(self) + } + + def sendBuffered(): Unit = { + val iter = buffer.iterator() + while (iter.hasNext()) + persistentActor.tell(iter.next(), Actor.noSender) + buffer.clear() + } + + def logIssue(errMsg: String): Unit = mode match { + case Warn | RepairByDiscardOld ⇒ log.warning(errMsg) + case Fail ⇒ log.error(errMsg) + case Disabled ⇒ throw new IllegalArgumentException("mode must not be Disabled") + } + + def fail(cause: IllegalStateException): Unit = { + buffer.clear() + persistentActor.tell(ReplayMessagesFailure(cause), Actor.noSender) + context.become { + case _: ReplayedMessage ⇒ // discard + case msg @ (_: RecoverySuccess | _: ReplayMessagesFailure) ⇒ + context.stop(self) + } + } + +} diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala index 0e29191e4e..53a3bd5afe 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -23,15 +23,10 @@ trait SnapshotStore extends Actor with ActorLogging { private val publish = extension.settings.internal.publishPluginCommands private val breaker = { - val cfg = context.system.settings.config - val cbConfig = - if (cfg.hasPath(self.path.name + ".circuit-breaker")) - cfg.getConfig(self.path.name + ".circuit-breaker") - .withFallback(cfg.getConfig("akka.persistence.default-circuit-breaker")) - else cfg.getConfig("akka.persistence.default-circuit-breaker") - val maxFailures = cbConfig.getInt("max-failures") - val callTimeout = cbConfig.getDuration("call-timeout", MILLISECONDS).millis - val resetTimeout = cbConfig.getDuration("reset-timeout", MILLISECONDS).millis + val cfg = extension.configFor(self) + val maxFailures = cfg.getInt("circuit-breaker.max-failures") + val callTimeout = cfg.getDuration("circuit-breaker.call-timeout", MILLISECONDS).millis + val resetTimeout = cfg.getDuration("circuit-breaker.reset-timeout", MILLISECONDS).millis CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout) } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index ff2224e858..98e93b1834 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -288,6 +288,34 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config( expectMsg(List("a", "b", "err", "c", "d")) } + "detect overlapping writers during replay" in { + val p1 = namedPersistentActor[Behavior1PersistentActor] + p1 ! Cmd("a") + p1 ! GetState + expectMsg(List("a-1", "a-2")) + + // create another with same persistenceId + val p2 = namedPersistentActor[Behavior1PersistentActor] + p2 ! GetState + expectMsg(List("a-1", "a-2")) + + // continue writing from the old writer + p1 ! Cmd("b") + p1 ! GetState + expectMsg(List("a-1", "a-2", "b-1", "b-2")) + + p2 ! Cmd("c") + p2 ! GetState + expectMsg(List("a-1", "a-2", "c-1", "c-2")) + + // Create yet another one with same persistenceId, b-1 and b-2 discarded during replay + EventFilter.warning(start = "Invalid replayed event", occurrences = 2) intercept { + val p3 = namedPersistentActor[Behavior1PersistentActor] + p3 ! GetState + expectMsg(List("a-1", "a-2", "c-1", "c-2")) + } + } + } } diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala index 6ed0997a77..e315752949 100644 --- a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala @@ -5,7 +5,7 @@ package akka.persistence.fsm import akka.actor._ -import akka.persistence.{PersistentActor, RecoveryCompleted, PersistenceSpec} +import akka.persistence.{ PersistentActor, RecoveryCompleted, PersistenceSpec } import akka.persistence.fsm.PersistentFSM._ import akka.testkit._ import com.typesafe.config.Config @@ -266,13 +266,13 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) val persistentEventsStreamer = system.actorOf(PersistentEventsStreamer.props(persistenceId, testActor)) expectMsg(ItemAdded(Item("1", "Shirt", 59.99F))) - expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted + expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted expectMsg(ItemAdded(Item("2", "Shoes", 89.99F))) - expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted + expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted expectMsg(ItemAdded(Item("3", "Coat", 119.99F))) - expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted + expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted expectMsg(OrderExecuted) expectMsgType[StateChangeEvent] @@ -431,13 +431,13 @@ object PersistentFSMSpec { def props(persistenceId: String, reportActor: ActorRef) = Props(new WebStoreCustomerFSM(persistenceId, reportActor)) } - + class PersistentEventsStreamer(id: String, client: ActorRef) extends PersistentActor { override val persistenceId: String = id def receiveRecover = { - case RecoveryCompleted ⇒ // do nothing - case persistentEvent ⇒ client ! persistentEvent + case RecoveryCompleted ⇒ // do nothing + case persistentEvent ⇒ client ! persistentEvent } def receiveCommand = { diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/ReplayFilterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/journal/ReplayFilterSpec.scala new file mode 100644 index 0000000000..d66e440de5 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/journal/ReplayFilterSpec.scala @@ -0,0 +1,226 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence.journal + +import scala.concurrent.duration._ +import akka.actor._ +import akka.testkit._ +import akka.persistence.JournalProtocol +import akka.persistence.PersistentRepr + +class ReplayFilterSpec extends AkkaSpec with ImplicitSender { + import JournalProtocol._ + import ReplayFilter.{ Warn, Fail, RepairByDiscardOld } + + val writerA = "writer-A" + val writerB = "writer-B" + val writerC = "writer-C" + + val m1 = ReplayedMessage(PersistentRepr("a", 13, "p1", "", writerUuid = writerA)) + val m2 = ReplayedMessage(PersistentRepr("b", 14, "p1", "", writerUuid = writerA)) + val m3 = ReplayedMessage(PersistentRepr("c", 15, "p1", "", writerUuid = writerA)) + val m4 = ReplayedMessage(PersistentRepr("d", 16, "p1", "", writerUuid = writerA)) + val successMsg = RecoverySuccess(15) + + "ReplayFilter in RepairByDiscardOld mode" must { + "pass on all replayed messages and then stop" in { + val filter = system.actorOf(ReplayFilter.props( + testActor, mode = RepairByDiscardOld, windowSize = 2, maxOldWriters = 10)) + filter ! m1 + filter ! m2 + filter ! m3 + filter ! successMsg + + expectMsg(m1) + expectMsg(m2) + expectMsg(m3) + expectMsg(successMsg) + + watch(filter) + expectTerminated(filter) + } + + "pass on all replayed messages when switching writer" in { + val filter = system.actorOf(ReplayFilter.props( + testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10)) + filter ! m1 + filter ! m2 + val m32 = m3.copy(persistent = m3.persistent.update(writerUuid = writerB)) + filter ! m32 + filter ! successMsg + + expectMsg(m1) + expectMsg(m2) + expectMsg(m32) + expectMsg(successMsg) + } + + "discard message with same seqNo from old overlapping writer" in { + val filter = system.actorOf(ReplayFilter.props( + testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10)) + EventFilter.warning(start = "Invalid replayed event", occurrences = 1) intercept { + filter ! m1 + filter ! m2 + filter ! m3 + val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB)) + filter ! m3b // same seqNo as m3, but from writerB + filter ! successMsg + + expectMsg(m1) + expectMsg(m2) + expectMsg(m3b) // discard m3, because same seqNo from new writer + expectMsg(successMsg) + } + } + + "discard messages from old writer after switching writer" in { + val filter = system.actorOf(ReplayFilter.props( + testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10)) + EventFilter.warning(start = "Invalid replayed event", occurrences = 2) intercept { + filter ! m1 + filter ! m2 + val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB)) + filter ! m3b + filter ! m3 + filter ! m4 + filter ! successMsg + + expectMsg(m1) + expectMsg(m2) + expectMsg(m3b) + // discard m3, m4 + expectMsg(successMsg) + } + } + + "discard messages from several old writers" in { + val filter = system.actorOf(ReplayFilter.props( + testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10)) + EventFilter.warning(start = "Invalid replayed event", occurrences = 3) intercept { + filter ! m1 + val m2b = m2.copy(persistent = m2.persistent.update(writerUuid = writerB)) + filter ! m2b + val m3c = m3.copy(persistent = m3.persistent.update(writerUuid = writerC)) + filter ! m3c + filter ! m2 + filter ! m3 + val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB)) + filter ! m3b + val m4c = m4.copy(persistent = m4.persistent.update(writerUuid = writerC)) + filter ! m4c + filter ! successMsg + + expectMsg(m1) + expectMsg(m2b) + expectMsg(m3c) + // discard m2, m3, m3b + expectMsg(m4c) + expectMsg(successMsg) + } + } + } + + "ReplayFilter in Fail mode" must { + "fail when message with same seqNo from old overlapping writer" in { + val filter = system.actorOf(ReplayFilter.props( + testActor, mode = Fail, windowSize = 100, maxOldWriters = 10)) + EventFilter.error(start = "Invalid replayed event", occurrences = 1) intercept { + filter ! m1 + filter ! m2 + filter ! m3 + val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB)) + filter ! m3b // same seqNo as m3, but from writerB + filter ! successMsg + + expectMsgType[ReplayMessagesFailure].cause.getClass should be(classOf[IllegalStateException]) + } + } + + "fail when messages from old writer after switching writer" in { + val filter = system.actorOf(ReplayFilter.props( + testActor, mode = Fail, windowSize = 100, maxOldWriters = 10)) + EventFilter.error(start = "Invalid replayed event", occurrences = 1) intercept { + filter ! m1 + filter ! m2 + val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB)) + filter ! m3b + filter ! m3 + filter ! m4 + filter ! successMsg + + expectMsgType[ReplayMessagesFailure].cause.getClass should be(classOf[IllegalStateException]) + } + } + } + + "ReplayFilter in Warn mode" must { + "warn about message with same seqNo from old overlapping writer" in { + val filter = system.actorOf(ReplayFilter.props( + testActor, mode = Warn, windowSize = 100, maxOldWriters = 10)) + EventFilter.warning(start = "Invalid replayed event", occurrences = 1) intercept { + filter ! m1 + filter ! m2 + filter ! m3 + val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB)) + filter ! m3b // same seqNo as m3, but from writerB + filter ! successMsg + + expectMsg(m1) + expectMsg(m2) + expectMsg(m3) + expectMsg(m3b) + expectMsg(successMsg) + } + } + + "warn about messages from old writer after switching writer" in { + val filter = system.actorOf(ReplayFilter.props( + testActor, mode = Warn, windowSize = 100, maxOldWriters = 10)) + EventFilter.warning(start = "Invalid replayed event", occurrences = 2) intercept { + filter ! m1 + filter ! m2 + val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB)) + filter ! m3b + filter ! m3 + filter ! m4 + filter ! successMsg + + expectMsg(m1) + expectMsg(m2) + expectMsg(m3b) + expectMsg(m3) + expectMsg(m4) + expectMsg(successMsg) + } + } + + "warn about messages from several old writers" in { + val filter = system.actorOf(ReplayFilter.props( + testActor, mode = Warn, windowSize = 100, maxOldWriters = 10)) + EventFilter.warning(start = "Invalid replayed event", occurrences = 3) intercept { + filter ! m1 + val m2b = m2.copy(persistent = m2.persistent.update(writerUuid = writerB)) + filter ! m2b + val m3c = m3.copy(persistent = m3.persistent.update(writerUuid = writerC)) + filter ! m3c + filter ! m2 + filter ! m3 + val m3b = m3.copy(persistent = m3.persistent.update(writerUuid = writerB)) + filter ! m3b + val m4c = m4.copy(persistent = m4.persistent.update(writerUuid = writerC)) + filter ! m4c + filter ! successMsg + + expectMsg(m1) + expectMsg(m2b) + expectMsg(m3c) + expectMsg(m2) + expectMsg(m3) + expectMsg(m3b) + expectMsg(m4c) + expectMsg(successMsg) + } + } + } +}