diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index da25fcd62b..fade70fa4d 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -80,6 +80,12 @@ akka.persistence { } } } + # CircuitBreaker setting if not defined in plugin config section named circuit-breaker + default-circuit-breaker { + max-failures = 10 + call-timeout = 10s + reset-timeout = 30s + } } # Protobuf serialization for the persistent extension messages. @@ -124,6 +130,12 @@ akka.persistence.snapshot-store.local { # yet older snapshot files are available. Each recovery attempt will try # to recover using an older than previously failed-on snapshot file (if any are present). max-load-attempts = 3 + # CircuitBreaker settings + circuit-breaker { + max-failures = 5 + call-timeout = 20s + reset-timeout = 60s + } } # LevelDB journal plugin. @@ -143,6 +155,12 @@ akka.persistence.journal.leveldb { checksum = off # Native LevelDB (via JNI) or LevelDB Java port. native = on + # CircuitBreaker settings + circuit-breaker { + max-failures = 10 + call-timeout = 10s + reset-timeout = 30s + } } # Shared LevelDB journal plugin (for testing only). diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index c6cfd7f9f7..2752e11a5f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -188,13 +188,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { case Some(extensionId) ⇒ extensionId(system).adapters case None ⇒ - val extensionId = new ExtensionId[PluginHolder] { - override def createExtension(system: ExtendedActorSystem): PluginHolder = { - val plugin = createPlugin(configPath)(journalDispatchSelector) - val adapters = createAdapters(configPath) - PluginHolder(plugin, adapters) - } - } + val extensionId = new PluginHolderExtensionId(configPath) journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) adaptersFor(journalPluginId) // Recursive invocation. } @@ -226,13 +220,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { case Some(extensionId) ⇒ extensionId(system).actor case None ⇒ - val extensionId = new ExtensionId[PluginHolder] { - override def createExtension(system: ExtendedActorSystem): PluginHolder = { - val plugin = createPlugin(configPath)(journalDispatchSelector) - val adapters = createAdapters(configPath) - PluginHolder(plugin, adapters) - } - } + val extensionId = new PluginHolderExtensionId(configPath) journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) journalFor(journalPluginId) // Recursive invocation. } @@ -251,13 +239,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { case Some(extensionId) ⇒ extensionId(system).actor case None ⇒ - val extensionId = new ExtensionId[PluginHolder] { - override def createExtension(system: ExtendedActorSystem): PluginHolder = { - val plugin = createPlugin(configPath)(snapshotDispatchSelector) - val adapters = createAdapters(configPath) - PluginHolder(plugin, adapters) - } - } + val extensionId = new PluginHolderExtensionId(configPath) snapshotPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) snapshotStoreFor(snapshotPluginId) // Recursive invocation. } @@ -288,4 +270,12 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private def id(ref: ActorRef) = ref.path.toStringWithoutAddress + private class PluginHolderExtensionId(configPath: String) extends ExtensionId[PluginHolder] { + override def createExtension(system: ExtendedActorSystem): PluginHolder = { + val plugin = createPlugin(configPath)(journalDispatchSelector) + val adapters = createAdapters(configPath) + PluginHolder(plugin, adapters) + } + } + } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala index 5c7e14877f..c880045a71 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala @@ -27,6 +27,12 @@ trait AsyncRecovery { * The `toSequenceNr` is the lowest of what was returned by [[#asyncReadHighestSequenceNr]] * and what the user specified as recovery [[akka.persistence.Recovery]] parameter. * + * This call is NOT protected with a circuit-breaker because it may take long time + * to replay all events. The plugin implementation itself must protect against + * an unresponsive backend store and make sure that the returned Future is + * completed with success or failure within reasonable time. It is not allowed + * to ignore completing the future. + * * @param persistenceId persistent actor id. * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). @@ -46,6 +52,8 @@ trait AsyncRecovery { * This sequence number is also used as `toSequenceNr` in subsequent call * to [[#asyncReplayMessages]] unless the user has specified a lower `toSequenceNr`. * + * This call is protected with a circuit-breaker. + * * @param persistenceId persistent actor id. * @param fromSequenceNr hint where to start searching for the highest sequence * number. diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index f7f276b6e7..a9b06dd751 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -5,14 +5,15 @@ package akka.persistence.journal +import scala.concurrent.duration._ import akka.actor._ import akka.pattern.pipe import akka.persistence._ - import scala.collection.immutable import scala.concurrent.Future import scala.util.{ Failure, Success, Try } import scala.util.control.NonFatal +import akka.pattern.CircuitBreaker /** * Abstract journal, optimized for asynchronous, non-blocking writes. @@ -28,6 +29,19 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { private val resequencer = context.actorOf(Props[Resequencer]()) private var resequencerCounter = 1L + private val breaker = { + val cfg = context.system.settings.config + val cbConfig = + if (cfg.hasPath(self.path.name + ".circuit-breaker")) + cfg.getConfig(self.path.name + ".circuit-breaker") + .withFallback(cfg.getConfig("akka.persistence.default-circuit-breaker")) + else cfg.getConfig("akka.persistence.default-circuit-breaker") + val maxFailures = cbConfig.getInt("max-failures") + val callTimeout = cbConfig.getDuration("call-timeout", MILLISECONDS).millis + val resetTimeout = cbConfig.getDuration("reset-timeout", MILLISECONDS).millis + CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout) + } + final def receive = receiveWriteJournal.orElse[Any, Unit](receivePluginInternal) final val receiveWriteJournal: Actor.Receive = { @@ -39,8 +53,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { val prepared = Try(preparePersistentBatch(messages)) val writeResult = (prepared match { case Success(prep) ⇒ - // in case the asyncWriteMessages throws - try asyncWriteMessages(prep) catch { case NonFatal(e) ⇒ Future.failed(e) } + // try in case the asyncWriteMessages throws + try breaker.withCircuitBreaker(asyncWriteMessages(prep)) + catch { case NonFatal(e) ⇒ Future.failed(e) } case f @ Failure(_) ⇒ // exception from preparePersistentBatch => rejected Future.successful(messages.collect { case a: AtomicWrite ⇒ f }) @@ -96,30 +111,32 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) ⇒ - asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).flatMap { highSeqNr ⇒ - val toSeqNr = math.min(toSequenceNr, highSeqNr) - if (highSeqNr == 0L || fromSequenceNr > toSeqNr) - Future.successful(highSeqNr) - else { - // Send replayed messages and replay result to persistentActor directly. No need - // to resequence replayed messages relative to written and looped messages. - asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p ⇒ - if (!p.deleted) // old records from 2.3 may still have the deleted flag - adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒ - persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender) - } - }.map(_ ⇒ highSeqNr) + breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, fromSequenceNr)) + .flatMap { highSeqNr ⇒ + val toSeqNr = math.min(toSequenceNr, highSeqNr) + if (highSeqNr == 0L || fromSequenceNr > toSeqNr) + Future.successful(highSeqNr) + else { + // Send replayed messages and replay result to persistentActor directly. No need + // to resequence replayed messages relative to written and looped messages. + // not possible to use circuit breaker here + asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p ⇒ + if (!p.deleted) // old records from 2.3 may still have the deleted flag + adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒ + persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender) + } + }.map(_ ⇒ highSeqNr) + } + }.map { + highSeqNr ⇒ RecoverySuccess(highSeqNr) + }.recover { + case e ⇒ ReplayMessagesFailure(e) + }.pipeTo(persistentActor).onSuccess { + case _ if publish ⇒ context.system.eventStream.publish(r) } - }.map { - highSeqNr ⇒ RecoverySuccess(highSeqNr) - }.recover { - case e ⇒ ReplayMessagesFailure(e) - }.pipeTo(persistentActor).onSuccess { - case _ if publish ⇒ context.system.eventStream.publish(r) - } case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) ⇒ - asyncDeleteMessagesTo(persistenceId, toSequenceNr) map { + breaker.withCircuitBreaker(asyncDeleteMessagesTo(persistenceId, toSequenceNr)) map { case _ ⇒ DeleteMessagesSuccess(toSequenceNr) } recover { case e ⇒ DeleteMessagesFailure(e, toSequenceNr) @@ -173,12 +190,16 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * * It is possible but not mandatory to reduce number of allocations by returning * `Future.successful(Nil)` for the happy path, i.e. when no messages are rejected. + * + * This call is protected with a circuit-breaker. */ def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] /** * Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr` * (inclusive). + * + * This call is protected with a circuit-breaker. */ def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] @@ -187,6 +208,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * * Allows plugin implementers to use `f pipeTo self` and * handle additional messages for implementing advanced features + * */ def receivePluginInternal: Actor.Receive = Actor.emptyBehavior //#journal-plugin-api diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala index 8305fc93c7..0e29191e4e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -5,11 +5,12 @@ package akka.persistence.snapshot +import scala.concurrent.duration._ import scala.concurrent.Future - import akka.actor._ import akka.pattern.pipe import akka.persistence._ +import akka.pattern.CircuitBreaker /** * Abstract snapshot store. @@ -21,11 +22,24 @@ trait SnapshotStore extends Actor with ActorLogging { private val extension = Persistence(context.system) private val publish = extension.settings.internal.publishPluginCommands + private val breaker = { + val cfg = context.system.settings.config + val cbConfig = + if (cfg.hasPath(self.path.name + ".circuit-breaker")) + cfg.getConfig(self.path.name + ".circuit-breaker") + .withFallback(cfg.getConfig("akka.persistence.default-circuit-breaker")) + else cfg.getConfig("akka.persistence.default-circuit-breaker") + val maxFailures = cbConfig.getInt("max-failures") + val callTimeout = cbConfig.getDuration("call-timeout", MILLISECONDS).millis + val resetTimeout = cbConfig.getDuration("reset-timeout", MILLISECONDS).millis + CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout) + } + final def receive = receiveSnapshotStore.orElse[Any, Unit](receivePluginInternal) final val receiveSnapshotStore: Actor.Receive = { case LoadSnapshot(persistenceId, criteria, toSequenceNr) ⇒ - loadAsync(persistenceId, criteria.limit(toSequenceNr)) map { + breaker.withCircuitBreaker(loadAsync(persistenceId, criteria.limit(toSequenceNr))) map { sso ⇒ LoadSnapshotResult(sso, toSequenceNr) } recover { case e ⇒ LoadSnapshotResult(None, toSequenceNr) @@ -33,7 +47,7 @@ trait SnapshotStore extends Actor with ActorLogging { case SaveSnapshot(metadata, snapshot) ⇒ val md = metadata.copy(timestamp = System.currentTimeMillis) - saveAsync(md, snapshot) map { + breaker.withCircuitBreaker(saveAsync(md, snapshot)) map { _ ⇒ SaveSnapshotSuccess(md) } recover { case e ⇒ SaveSnapshotFailure(metadata, e) @@ -44,11 +58,11 @@ trait SnapshotStore extends Actor with ActorLogging { case evt @ SaveSnapshotFailure(metadata, _) ⇒ try { tryReceivePluginInternal(evt) - deleteAsync(metadata) + breaker.withCircuitBreaker(deleteAsync(metadata)) } finally senderPersistentActor() ! evt // sender is persistentActor case d @ DeleteSnapshot(metadata) ⇒ - deleteAsync(metadata).map { + breaker.withCircuitBreaker(deleteAsync(metadata)).map { case _ ⇒ DeleteSnapshotSuccess(metadata) }.recover { case e ⇒ DeleteSnapshotFailure(metadata, e) @@ -62,7 +76,7 @@ trait SnapshotStore extends Actor with ActorLogging { try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt case d @ DeleteSnapshots(persistenceId, criteria) ⇒ - deleteAsync(persistenceId, criteria).map { + breaker.withCircuitBreaker(deleteAsync(persistenceId, criteria)).map { case _ ⇒ DeleteSnapshotsSuccess(criteria) }.recover { case e ⇒ DeleteSnapshotsFailure(criteria, e) @@ -87,6 +101,8 @@ trait SnapshotStore extends Actor with ActorLogging { /** * Plugin API: asynchronously loads a snapshot. * + * This call is protected with a circuit-breaker. + * * @param persistenceId id of the persistent actor. * @param criteria selection criteria for loading. */ @@ -95,6 +111,8 @@ trait SnapshotStore extends Actor with ActorLogging { /** * Plugin API: asynchronously saves a snapshot. * + * This call is protected with a circuit-breaker. + * * @param metadata snapshot metadata. * @param snapshot snapshot. */ @@ -103,14 +121,17 @@ trait SnapshotStore extends Actor with ActorLogging { /** * Plugin API: deletes the snapshot identified by `metadata`. * + * This call is protected with a circuit-breaker. + * * @param metadata snapshot metadata. */ - def deleteAsync(metadata: SnapshotMetadata): Future[Unit] /** * Plugin API: deletes all snapshots matching `criteria`. * + * This call is protected with a circuit-breaker. + * * @param persistenceId id of the persistent actor. * @param criteria selection criteria for deleting. */