+per #15816 Use CircuitBreaker
to protect agains unresponsive journal and snapshot store
This commit is contained in:
parent
9993e032cb
commit
39c2d6d944
5 changed files with 111 additions and 52 deletions
|
|
@ -80,6 +80,12 @@ akka.persistence {
|
|||
}
|
||||
}
|
||||
}
|
||||
# CircuitBreaker setting if not defined in plugin config section named circuit-breaker
|
||||
default-circuit-breaker {
|
||||
max-failures = 10
|
||||
call-timeout = 10s
|
||||
reset-timeout = 30s
|
||||
}
|
||||
}
|
||||
|
||||
# Protobuf serialization for the persistent extension messages.
|
||||
|
|
@ -124,6 +130,12 @@ akka.persistence.snapshot-store.local {
|
|||
# yet older snapshot files are available. Each recovery attempt will try
|
||||
# to recover using an older than previously failed-on snapshot file (if any are present).
|
||||
max-load-attempts = 3
|
||||
# CircuitBreaker settings
|
||||
circuit-breaker {
|
||||
max-failures = 5
|
||||
call-timeout = 20s
|
||||
reset-timeout = 60s
|
||||
}
|
||||
}
|
||||
|
||||
# LevelDB journal plugin.
|
||||
|
|
@ -143,6 +155,12 @@ akka.persistence.journal.leveldb {
|
|||
checksum = off
|
||||
# Native LevelDB (via JNI) or LevelDB Java port.
|
||||
native = on
|
||||
# CircuitBreaker settings
|
||||
circuit-breaker {
|
||||
max-failures = 10
|
||||
call-timeout = 10s
|
||||
reset-timeout = 30s
|
||||
}
|
||||
}
|
||||
|
||||
# Shared LevelDB journal plugin (for testing only).
|
||||
|
|
|
|||
|
|
@ -188,13 +188,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
|||
case Some(extensionId) ⇒
|
||||
extensionId(system).adapters
|
||||
case None ⇒
|
||||
val extensionId = new ExtensionId[PluginHolder] {
|
||||
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
|
||||
val plugin = createPlugin(configPath)(journalDispatchSelector)
|
||||
val adapters = createAdapters(configPath)
|
||||
PluginHolder(plugin, adapters)
|
||||
}
|
||||
}
|
||||
val extensionId = new PluginHolderExtensionId(configPath)
|
||||
journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
|
||||
adaptersFor(journalPluginId) // Recursive invocation.
|
||||
}
|
||||
|
|
@ -226,13 +220,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
|||
case Some(extensionId) ⇒
|
||||
extensionId(system).actor
|
||||
case None ⇒
|
||||
val extensionId = new ExtensionId[PluginHolder] {
|
||||
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
|
||||
val plugin = createPlugin(configPath)(journalDispatchSelector)
|
||||
val adapters = createAdapters(configPath)
|
||||
PluginHolder(plugin, adapters)
|
||||
}
|
||||
}
|
||||
val extensionId = new PluginHolderExtensionId(configPath)
|
||||
journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
|
||||
journalFor(journalPluginId) // Recursive invocation.
|
||||
}
|
||||
|
|
@ -251,13 +239,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
|||
case Some(extensionId) ⇒
|
||||
extensionId(system).actor
|
||||
case None ⇒
|
||||
val extensionId = new ExtensionId[PluginHolder] {
|
||||
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
|
||||
val plugin = createPlugin(configPath)(snapshotDispatchSelector)
|
||||
val adapters = createAdapters(configPath)
|
||||
PluginHolder(plugin, adapters)
|
||||
}
|
||||
}
|
||||
val extensionId = new PluginHolderExtensionId(configPath)
|
||||
snapshotPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
|
||||
snapshotStoreFor(snapshotPluginId) // Recursive invocation.
|
||||
}
|
||||
|
|
@ -288,4 +270,12 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
|||
|
||||
private def id(ref: ActorRef) = ref.path.toStringWithoutAddress
|
||||
|
||||
private class PluginHolderExtensionId(configPath: String) extends ExtensionId[PluginHolder] {
|
||||
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
|
||||
val plugin = createPlugin(configPath)(journalDispatchSelector)
|
||||
val adapters = createAdapters(configPath)
|
||||
PluginHolder(plugin, adapters)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,6 +27,12 @@ trait AsyncRecovery {
|
|||
* The `toSequenceNr` is the lowest of what was returned by [[#asyncReadHighestSequenceNr]]
|
||||
* and what the user specified as recovery [[akka.persistence.Recovery]] parameter.
|
||||
*
|
||||
* This call is NOT protected with a circuit-breaker because it may take long time
|
||||
* to replay all events. The plugin implementation itself must protect against
|
||||
* an unresponsive backend store and make sure that the returned Future is
|
||||
* completed with success or failure within reasonable time. It is not allowed
|
||||
* to ignore completing the future.
|
||||
*
|
||||
* @param persistenceId persistent actor id.
|
||||
* @param fromSequenceNr sequence number where replay should start (inclusive).
|
||||
* @param toSequenceNr sequence number where replay should end (inclusive).
|
||||
|
|
@ -46,6 +52,8 @@ trait AsyncRecovery {
|
|||
* This sequence number is also used as `toSequenceNr` in subsequent call
|
||||
* to [[#asyncReplayMessages]] unless the user has specified a lower `toSequenceNr`.
|
||||
*
|
||||
* This call is protected with a circuit-breaker.
|
||||
*
|
||||
* @param persistenceId persistent actor id.
|
||||
* @param fromSequenceNr hint where to start searching for the highest sequence
|
||||
* number.
|
||||
|
|
|
|||
|
|
@ -5,14 +5,15 @@
|
|||
|
||||
package akka.persistence.journal
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor._
|
||||
import akka.pattern.pipe
|
||||
import akka.persistence._
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import scala.util.control.NonFatal
|
||||
import akka.pattern.CircuitBreaker
|
||||
|
||||
/**
|
||||
* Abstract journal, optimized for asynchronous, non-blocking writes.
|
||||
|
|
@ -28,6 +29,19 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
|||
private val resequencer = context.actorOf(Props[Resequencer]())
|
||||
private var resequencerCounter = 1L
|
||||
|
||||
private val breaker = {
|
||||
val cfg = context.system.settings.config
|
||||
val cbConfig =
|
||||
if (cfg.hasPath(self.path.name + ".circuit-breaker"))
|
||||
cfg.getConfig(self.path.name + ".circuit-breaker")
|
||||
.withFallback(cfg.getConfig("akka.persistence.default-circuit-breaker"))
|
||||
else cfg.getConfig("akka.persistence.default-circuit-breaker")
|
||||
val maxFailures = cbConfig.getInt("max-failures")
|
||||
val callTimeout = cbConfig.getDuration("call-timeout", MILLISECONDS).millis
|
||||
val resetTimeout = cbConfig.getDuration("reset-timeout", MILLISECONDS).millis
|
||||
CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout)
|
||||
}
|
||||
|
||||
final def receive = receiveWriteJournal.orElse[Any, Unit](receivePluginInternal)
|
||||
|
||||
final val receiveWriteJournal: Actor.Receive = {
|
||||
|
|
@ -39,8 +53,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
|||
val prepared = Try(preparePersistentBatch(messages))
|
||||
val writeResult = (prepared match {
|
||||
case Success(prep) ⇒
|
||||
// in case the asyncWriteMessages throws
|
||||
try asyncWriteMessages(prep) catch { case NonFatal(e) ⇒ Future.failed(e) }
|
||||
// try in case the asyncWriteMessages throws
|
||||
try breaker.withCircuitBreaker(asyncWriteMessages(prep))
|
||||
catch { case NonFatal(e) ⇒ Future.failed(e) }
|
||||
case f @ Failure(_) ⇒
|
||||
// exception from preparePersistentBatch => rejected
|
||||
Future.successful(messages.collect { case a: AtomicWrite ⇒ f })
|
||||
|
|
@ -96,30 +111,32 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
|||
|
||||
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) ⇒
|
||||
|
||||
asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).flatMap { highSeqNr ⇒
|
||||
val toSeqNr = math.min(toSequenceNr, highSeqNr)
|
||||
if (highSeqNr == 0L || fromSequenceNr > toSeqNr)
|
||||
Future.successful(highSeqNr)
|
||||
else {
|
||||
// Send replayed messages and replay result to persistentActor directly. No need
|
||||
// to resequence replayed messages relative to written and looped messages.
|
||||
asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p ⇒
|
||||
if (!p.deleted) // old records from 2.3 may still have the deleted flag
|
||||
adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒
|
||||
persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
|
||||
}
|
||||
}.map(_ ⇒ highSeqNr)
|
||||
breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, fromSequenceNr))
|
||||
.flatMap { highSeqNr ⇒
|
||||
val toSeqNr = math.min(toSequenceNr, highSeqNr)
|
||||
if (highSeqNr == 0L || fromSequenceNr > toSeqNr)
|
||||
Future.successful(highSeqNr)
|
||||
else {
|
||||
// Send replayed messages and replay result to persistentActor directly. No need
|
||||
// to resequence replayed messages relative to written and looped messages.
|
||||
// not possible to use circuit breaker here
|
||||
asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p ⇒
|
||||
if (!p.deleted) // old records from 2.3 may still have the deleted flag
|
||||
adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒
|
||||
persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
|
||||
}
|
||||
}.map(_ ⇒ highSeqNr)
|
||||
}
|
||||
}.map {
|
||||
highSeqNr ⇒ RecoverySuccess(highSeqNr)
|
||||
}.recover {
|
||||
case e ⇒ ReplayMessagesFailure(e)
|
||||
}.pipeTo(persistentActor).onSuccess {
|
||||
case _ if publish ⇒ context.system.eventStream.publish(r)
|
||||
}
|
||||
}.map {
|
||||
highSeqNr ⇒ RecoverySuccess(highSeqNr)
|
||||
}.recover {
|
||||
case e ⇒ ReplayMessagesFailure(e)
|
||||
}.pipeTo(persistentActor).onSuccess {
|
||||
case _ if publish ⇒ context.system.eventStream.publish(r)
|
||||
}
|
||||
|
||||
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) ⇒
|
||||
asyncDeleteMessagesTo(persistenceId, toSequenceNr) map {
|
||||
breaker.withCircuitBreaker(asyncDeleteMessagesTo(persistenceId, toSequenceNr)) map {
|
||||
case _ ⇒ DeleteMessagesSuccess(toSequenceNr)
|
||||
} recover {
|
||||
case e ⇒ DeleteMessagesFailure(e, toSequenceNr)
|
||||
|
|
@ -173,12 +190,16 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
|||
*
|
||||
* It is possible but not mandatory to reduce number of allocations by returning
|
||||
* `Future.successful(Nil)` for the happy path, i.e. when no messages are rejected.
|
||||
*
|
||||
* This call is protected with a circuit-breaker.
|
||||
*/
|
||||
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]
|
||||
|
||||
/**
|
||||
* Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr`
|
||||
* (inclusive).
|
||||
*
|
||||
* This call is protected with a circuit-breaker.
|
||||
*/
|
||||
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
|
||||
|
||||
|
|
@ -187,6 +208,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
|||
*
|
||||
* Allows plugin implementers to use `f pipeTo self` and
|
||||
* handle additional messages for implementing advanced features
|
||||
*
|
||||
*/
|
||||
def receivePluginInternal: Actor.Receive = Actor.emptyBehavior
|
||||
//#journal-plugin-api
|
||||
|
|
|
|||
|
|
@ -5,11 +5,12 @@
|
|||
|
||||
package akka.persistence.snapshot
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Future
|
||||
|
||||
import akka.actor._
|
||||
import akka.pattern.pipe
|
||||
import akka.persistence._
|
||||
import akka.pattern.CircuitBreaker
|
||||
|
||||
/**
|
||||
* Abstract snapshot store.
|
||||
|
|
@ -21,11 +22,24 @@ trait SnapshotStore extends Actor with ActorLogging {
|
|||
private val extension = Persistence(context.system)
|
||||
private val publish = extension.settings.internal.publishPluginCommands
|
||||
|
||||
private val breaker = {
|
||||
val cfg = context.system.settings.config
|
||||
val cbConfig =
|
||||
if (cfg.hasPath(self.path.name + ".circuit-breaker"))
|
||||
cfg.getConfig(self.path.name + ".circuit-breaker")
|
||||
.withFallback(cfg.getConfig("akka.persistence.default-circuit-breaker"))
|
||||
else cfg.getConfig("akka.persistence.default-circuit-breaker")
|
||||
val maxFailures = cbConfig.getInt("max-failures")
|
||||
val callTimeout = cbConfig.getDuration("call-timeout", MILLISECONDS).millis
|
||||
val resetTimeout = cbConfig.getDuration("reset-timeout", MILLISECONDS).millis
|
||||
CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout)
|
||||
}
|
||||
|
||||
final def receive = receiveSnapshotStore.orElse[Any, Unit](receivePluginInternal)
|
||||
|
||||
final val receiveSnapshotStore: Actor.Receive = {
|
||||
case LoadSnapshot(persistenceId, criteria, toSequenceNr) ⇒
|
||||
loadAsync(persistenceId, criteria.limit(toSequenceNr)) map {
|
||||
breaker.withCircuitBreaker(loadAsync(persistenceId, criteria.limit(toSequenceNr))) map {
|
||||
sso ⇒ LoadSnapshotResult(sso, toSequenceNr)
|
||||
} recover {
|
||||
case e ⇒ LoadSnapshotResult(None, toSequenceNr)
|
||||
|
|
@ -33,7 +47,7 @@ trait SnapshotStore extends Actor with ActorLogging {
|
|||
|
||||
case SaveSnapshot(metadata, snapshot) ⇒
|
||||
val md = metadata.copy(timestamp = System.currentTimeMillis)
|
||||
saveAsync(md, snapshot) map {
|
||||
breaker.withCircuitBreaker(saveAsync(md, snapshot)) map {
|
||||
_ ⇒ SaveSnapshotSuccess(md)
|
||||
} recover {
|
||||
case e ⇒ SaveSnapshotFailure(metadata, e)
|
||||
|
|
@ -44,11 +58,11 @@ trait SnapshotStore extends Actor with ActorLogging {
|
|||
case evt @ SaveSnapshotFailure(metadata, _) ⇒
|
||||
try {
|
||||
tryReceivePluginInternal(evt)
|
||||
deleteAsync(metadata)
|
||||
breaker.withCircuitBreaker(deleteAsync(metadata))
|
||||
} finally senderPersistentActor() ! evt // sender is persistentActor
|
||||
|
||||
case d @ DeleteSnapshot(metadata) ⇒
|
||||
deleteAsync(metadata).map {
|
||||
breaker.withCircuitBreaker(deleteAsync(metadata)).map {
|
||||
case _ ⇒ DeleteSnapshotSuccess(metadata)
|
||||
}.recover {
|
||||
case e ⇒ DeleteSnapshotFailure(metadata, e)
|
||||
|
|
@ -62,7 +76,7 @@ trait SnapshotStore extends Actor with ActorLogging {
|
|||
try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt
|
||||
|
||||
case d @ DeleteSnapshots(persistenceId, criteria) ⇒
|
||||
deleteAsync(persistenceId, criteria).map {
|
||||
breaker.withCircuitBreaker(deleteAsync(persistenceId, criteria)).map {
|
||||
case _ ⇒ DeleteSnapshotsSuccess(criteria)
|
||||
}.recover {
|
||||
case e ⇒ DeleteSnapshotsFailure(criteria, e)
|
||||
|
|
@ -87,6 +101,8 @@ trait SnapshotStore extends Actor with ActorLogging {
|
|||
/**
|
||||
* Plugin API: asynchronously loads a snapshot.
|
||||
*
|
||||
* This call is protected with a circuit-breaker.
|
||||
*
|
||||
* @param persistenceId id of the persistent actor.
|
||||
* @param criteria selection criteria for loading.
|
||||
*/
|
||||
|
|
@ -95,6 +111,8 @@ trait SnapshotStore extends Actor with ActorLogging {
|
|||
/**
|
||||
* Plugin API: asynchronously saves a snapshot.
|
||||
*
|
||||
* This call is protected with a circuit-breaker.
|
||||
*
|
||||
* @param metadata snapshot metadata.
|
||||
* @param snapshot snapshot.
|
||||
*/
|
||||
|
|
@ -103,14 +121,17 @@ trait SnapshotStore extends Actor with ActorLogging {
|
|||
/**
|
||||
* Plugin API: deletes the snapshot identified by `metadata`.
|
||||
*
|
||||
* This call is protected with a circuit-breaker.
|
||||
*
|
||||
* @param metadata snapshot metadata.
|
||||
*/
|
||||
|
||||
def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
|
||||
|
||||
/**
|
||||
* Plugin API: deletes all snapshots matching `criteria`.
|
||||
*
|
||||
* This call is protected with a circuit-breaker.
|
||||
*
|
||||
* @param persistenceId id of the persistent actor.
|
||||
* @param criteria selection criteria for deleting.
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue