recovery timeout for persistent actors #20698

This commit is contained in:
Johan Andrén 2016-06-03 14:17:41 +02:00
parent 32e72f8208
commit 896ea53dd3
6 changed files with 161 additions and 44 deletions

View file

@ -65,11 +65,13 @@ abstract class QuickRestartSpec
runOn(second) { runOn(second) {
restartingSystem = restartingSystem =
if (restartingSystem == null) if (restartingSystem == null)
ActorSystem(system.name, ActorSystem(
system.name,
ConfigFactory.parseString(s"akka.cluster.roles = [round-$n]") ConfigFactory.parseString(s"akka.cluster.roles = [round-$n]")
.withFallback(system.settings.config)) .withFallback(system.settings.config))
else else
ActorSystem(system.name, ActorSystem(
system.name,
ConfigFactory.parseString(s""" ConfigFactory.parseString(s"""
akka.cluster.roles = [round-$n] akka.cluster.roles = [round-$n]
akka.remote.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get}""") // same port akka.remote.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get}""") // same port

View file

@ -92,11 +92,11 @@ akka.persistence {
} }
} }
} }
# Fallback settings for journal plugin configurations. # Fallback settings for journal plugin configurations.
# These settings are used if they are not defined in plugin config section. # These settings are used if they are not defined in plugin config section.
journal-plugin-fallback { journal-plugin-fallback {
# Fully qualified class name providing journal plugin api implementation. # Fully qualified class name providing journal plugin api implementation.
# It is mandatory to specify this property. # It is mandatory to specify this property.
# The class must have a constructor without parameters or constructor with # The class must have a constructor without parameters or constructor with
@ -105,40 +105,46 @@ akka.persistence {
# Dispatcher for the plugin actor. # Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for message replay. # Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"
# Removed: used to be the Maximum size of a persistent message batch written to the journal. # Removed: used to be the Maximum size of a persistent message batch written to the journal.
# Now this setting is without function, PersistentActor will write as many messages # Now this setting is without function, PersistentActor will write as many messages
# as it has accumulated since the last write. # as it has accumulated since the last write.
max-message-batch-size = 200 max-message-batch-size = 200
# If there is more time in between individual events gotten from the journal
# recovery than this the recovery will fail.
# Note that it also affects reading the snapshot before replaying events on
# top of it, even though it is configured for the journal.
recovery-event-timeout = 30s
circuit-breaker { circuit-breaker {
max-failures = 10 max-failures = 10
call-timeout = 10s call-timeout = 10s
reset-timeout = 30s reset-timeout = 30s
} }
# The replay filter can detect a corrupt event stream by inspecting # The replay filter can detect a corrupt event stream by inspecting
# sequence numbers and writerUuid when replaying events. # sequence numbers and writerUuid when replaying events.
replay-filter { replay-filter {
# What the filter should do when detecting invalid events. # What the filter should do when detecting invalid events.
# Supported values: # Supported values:
# `repair-by-discard-old` : discard events from old writers, # `repair-by-discard-old` : discard events from old writers,
# warning is logged # warning is logged
# `fail` : fail the replay, error is logged # `fail` : fail the replay, error is logged
# `warn` : log warning but emit events untouched # `warn` : log warning but emit events untouched
# `off` : disable this feature completely # `off` : disable this feature completely
mode = repair-by-discard-old mode = repair-by-discard-old
# It uses a look ahead buffer for analyzing the events. # It uses a look ahead buffer for analyzing the events.
# This defines the size (in number of events) of the buffer. # This defines the size (in number of events) of the buffer.
window-size = 100 window-size = 100
# How many old writerUuid to remember # How many old writerUuid to remember
max-old-writers = 10 max-old-writers = 10
# Set this to `on` to enable detailed debug logging of each # Set this to `on` to enable detailed debug logging of each
# replayed event. # replayed event.
debug = off debug = off
@ -148,8 +154,8 @@ akka.persistence {
# Fallback settings for snapshot store plugin configurations # Fallback settings for snapshot store plugin configurations
# These settings are used if they are not defined in plugin config section. # These settings are used if they are not defined in plugin config section.
snapshot-store-plugin-fallback { snapshot-store-plugin-fallback {
# Fully qualified class name providing snapshot store plugin api # Fully qualified class name providing snapshot store plugin api
# implementation. It is mandatory to specify this property if # implementation. It is mandatory to specify this property if
# snapshot store is enabled. # snapshot store is enabled.
# The class must have a constructor without parameters or constructor with # The class must have a constructor without parameters or constructor with
@ -158,7 +164,7 @@ akka.persistence {
# Dispatcher for the plugin actor. # Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
circuit-breaker { circuit-breaker {
max-failures = 5 max-failures = 5
call-timeout = 20s call-timeout = 20s

View file

@ -9,11 +9,13 @@ import java.util.UUID
import scala.collection.immutable import scala.collection.immutable
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.DeadLetter import akka.actor.{ DeadLetter, ReceiveTimeout, StashOverflowException }
import akka.actor.StashOverflowException import akka.util.Helpers.ConfigOps
import akka.event.Logging import akka.event.Logging
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import scala.concurrent.duration.{ Duration, FiniteDuration }
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -461,6 +463,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
*/ */
private def recoveryStarted(replayMax: Long) = new State { private def recoveryStarted(replayMax: Long) = new State {
// protect against replay stalling forever because of journal overloaded and such
private val previousRecieveTimeout = context.receiveTimeout
context.setReceiveTimeout(extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout"))
private val recoveryBehavior: Receive = { private val recoveryBehavior: Receive = {
val _receiveRecover = receiveRecover val _receiveRecover = receiveRecover
@ -471,6 +477,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
_receiveRecover(s) _receiveRecover(s)
case RecoveryCompleted if _receiveRecover.isDefinedAt(RecoveryCompleted) case RecoveryCompleted if _receiveRecover.isDefinedAt(RecoveryCompleted)
_receiveRecover(RecoveryCompleted) _receiveRecover(RecoveryCompleted)
} }
} }
@ -485,8 +492,13 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
// Since we are recovering we can ignore the receive behavior from the stack // Since we are recovering we can ignore the receive behavior from the stack
Eventsourced.super.aroundReceive(recoveryBehavior, SnapshotOffer(metadata, snapshot)) Eventsourced.super.aroundReceive(recoveryBehavior, SnapshotOffer(metadata, snapshot))
} }
changeState(recovering(recoveryBehavior)) changeState(recovering(recoveryBehavior, previousRecieveTimeout))
journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self)
case ReceiveTimeout
try onRecoveryFailure(
new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within ${context.receiveTimeout.toSeconds}s"),
event = None)
finally context.stop(self)
case other case other
stashInternally(other) stashInternally(other)
} }
@ -502,32 +514,45 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
* *
* All incoming messages are stashed. * All incoming messages are stashed.
*/ */
private def recovering(recoveryBehavior: Receive) = new State { private def recovering(recoveryBehavior: Receive, previousReceiveTimeout: Duration) =
override def toString: String = "replay started" new State {
override def recoveryRunning: Boolean = true override def toString: String = "replay started"
override def stateReceive(receive: Receive, message: Any) = message match { override def recoveryRunning: Boolean = true
case ReplayedMessage(p)
try { override def stateReceive(receive: Receive, message: Any) = message match {
updateLastSequenceNr(p) case ReplayedMessage(p)
Eventsourced.super.aroundReceive(recoveryBehavior, p) try {
} catch { updateLastSequenceNr(p)
case NonFatal(t) Eventsourced.super.aroundReceive(recoveryBehavior, p)
try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self) } catch {
} case NonFatal(t)
case RecoverySuccess(highestSeqNr) try onRecoveryFailure(t, Some(p.payload)) finally context.stop(self)
onReplaySuccess() // callback for subclass implementation }
changeState(processingCommands) case RecoverySuccess(highestSeqNr)
sequenceNr = highestSeqNr resetRecieveTimeout()
setLastSequenceNr(highestSeqNr) onReplaySuccess() // callback for subclass implementation
internalStash.unstashAll() changeState(processingCommands)
Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) sequenceNr = highestSeqNr
case ReplayMessagesFailure(cause) setLastSequenceNr(highestSeqNr)
try onRecoveryFailure(cause, event = None) finally context.stop(self) internalStash.unstashAll()
case other Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
stashInternally(other) case ReplayMessagesFailure(cause)
resetRecieveTimeout()
try onRecoveryFailure(cause, event = None) finally context.stop(self)
case ReceiveTimeout
try onRecoveryFailure(
new RecoveryTimedOut(s"Recovery timed out, didn't get event within ${context.receiveTimeout.toSeconds}s, highest sequence number seen ${sequenceNr}"),
event = None)
finally context.stop(self)
case other
stashInternally(other)
}
private def resetRecieveTimeout(): Unit = {
context.setReceiveTimeout(previousReceiveTimeout)
}
} }
}
private def flushBatch() { private def flushBatch() {
if (eventBatch.nonEmpty) { if (eventBatch.nonEmpty) {

View file

@ -6,15 +6,18 @@ package akka.persistence
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.function.Consumer import java.util.function.Consumer
import akka.actor._ import akka.actor._
import akka.event.{ Logging, LoggingAdapter } import akka.event.{ Logging, LoggingAdapter }
import akka.persistence.journal.{ EventAdapters, IdentityEventAdapters } import akka.persistence.journal.{ EventAdapters, IdentityEventAdapters }
import akka.util.Collections.EmptyImmutableSeq import akka.util.Collections.EmptyImmutableSeq
import akka.util.Helpers.ConfigOps import akka.util.Helpers.ConfigOps
import com.typesafe.config.Config import com.typesafe.config.Config
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.util.Reflect import akka.util.Reflect
import scala.util.control.NonFatal import scala.util.control.NonFatal
/** /**

View file

@ -4,11 +4,14 @@
package akka.persistence package akka.persistence
import java.lang.{ Iterable JIterable } import java.lang.{ Iterable JIterable }
import akka.actor._ import akka.actor._
import akka.japi.Procedure import akka.japi.Procedure
import akka.japi.Util import akka.japi.Util
import com.typesafe.config.Config import com.typesafe.config.Config
import scala.util.control.NoStackTrace
abstract class RecoveryCompleted abstract class RecoveryCompleted
/** /**
* Sent to a [[PersistentActor]] when the journal replay has been finished. * Sent to a [[PersistentActor]] when the journal replay has been finished.
@ -98,6 +101,8 @@ object Recovery {
val none: Recovery = Recovery(toSequenceNr = 0L) val none: Recovery = Recovery(toSequenceNr = 0L)
} }
final class RecoveryTimedOut(message: String) extends RuntimeException(message) with NoStackTrace
/** /**
* This defines how to handle the current received message which failed to stash, when the size of * This defines how to handle the current received message which failed to stash, when the size of
* Stash exceeding the capacity of Stash. * Stash exceeding the capacity of Stash.

View file

@ -0,0 +1,76 @@
package akka.persistence
import akka.actor.Status.Failure
import akka.actor.{ Actor, ActorRef, Props }
import akka.persistence.journal.SteppingInmemJournal
import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object PersistentActorRecoveryTimeoutSpec {
val journalId = "persistent-actor-recovery-timeout-spec"
def config =
SteppingInmemJournal.config(PersistentActorRecoveryTimeoutSpec.journalId).withFallback(
ConfigFactory.parseString(
"""
|akka.persistence.journal.stepping-inmem.recovery-event-timeout=100ms
""".stripMargin)).withFallback(PersistenceSpec.config("stepping-inmem", "PersistentActorRecoveryTimeoutSpec"))
class TestActor(probe: ActorRef) extends NamedPersistentActor("recovery-timeout-actor") {
override def receiveRecover: Receive = Actor.emptyBehavior
override def receiveCommand: Receive = {
case x persist(x) { _
sender() ! x
}
}
override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = {
probe ! Failure(cause)
}
}
}
class PersistentActorRecoveryTimeoutSpec extends AkkaSpec(PersistentActorRecoveryTimeoutSpec.config) with ImplicitSender {
import PersistentActorRecoveryTimeoutSpec.journalId
"The recovery timeout" should {
"fail recovery if timeout is not met when recovering" in {
val probe = TestProbe()
val persisting = system.actorOf(Props(classOf[PersistentActorRecoveryTimeoutSpec.TestActor], probe.ref))
awaitAssert(SteppingInmemJournal.getRef(journalId), 3.seconds)
val journal = SteppingInmemJournal.getRef(journalId)
// initial read highest
SteppingInmemJournal.step(journal)
persisting ! "A"
SteppingInmemJournal.step(journal)
expectMsg("A")
watch(persisting)
system.stop(persisting)
expectTerminated(persisting)
// now replay, but don't give the journal any tokens to replay events
// so that we cause the timeout to trigger
val replaying = system.actorOf(Props(classOf[PersistentActorRecoveryTimeoutSpec.TestActor], probe.ref))
watch(replaying)
// initial read highest
SteppingInmemJournal.step(journal)
probe.expectMsgType[Failure].cause shouldBe a[RecoveryTimedOut]
expectTerminated(replaying)
}
}
}