Fix recovery timeout in Typed PersistentBehavior, #25268

* The TimerMsg was wrapped in IncomingCommand and therefore stashed,
  and when unstashed causing the ClassCastException
* Solved by not using timers here but plain scheduler
* Also fixing journalPluginId and snapshotPluginId
This commit is contained in:
Patrik Nordwall 2018-07-06 23:26:36 +02:00
parent 836347fe08
commit 67cc779ca6
12 changed files with 180 additions and 139 deletions

View file

@ -110,6 +110,9 @@ object Behavior {
* // all other kinds of Number will be `unhandled`
* }
* }}}
*
* Scheduled messages via [[akka.actor.typed.scaladsl.TimerScheduler]] can currently
* not be used together with `widen`, see issue #25318.
*/
def widen[U](matcher: PartialFunction[U, T]): Behavior[U] =
BehaviorImpl.widened(behavior, matcher)

View file

@ -9,9 +9,10 @@ import akka.util.{ ConstantFun, LineNumbers }
import akka.annotation.InternalApi
import akka.actor.typed.{ ActorContext AC }
import akka.actor.typed.scaladsl.{ ActorContext SAC }
import scala.reflect.ClassTag
import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg
/**
* INTERNAL API
*/
@ -45,11 +46,18 @@ import scala.reflect.ClassTag
override def receiveSignal(ctx: AC[U], signal: Signal): Behavior[U] =
widen(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T])
override def receive(ctx: AC[U], msg: U): Behavior[U] =
override def receive(ctx: AC[U], msg: U): Behavior[U] = {
// widen would wrap the TimerMessage, which would be wrong, see issue #25318
msg match {
case t: TimerMsg throw new IllegalArgumentException(
s"Timers and widen can't be used together, [${t.key}]. See issue #25318")
case _
}
matcher.applyOrElse(msg, any2null) match {
case null unhandled
case transformed widen(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T])
}
}
override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})"
}

View file

@ -255,6 +255,9 @@ object Behaviors {
* );
* }}}
*
* Scheduled messages via [[TimerScheduler]] can currently not be used
* together with `widen`, see issue #25318.
*
* @param behavior
* the behavior that will receive the selected messages
* @param selector

View file

@ -56,7 +56,8 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
def createBehavior(state: ReplayingState[S]): Behavior[InternalProtocol] = {
Behaviors.setup { _
startRecoveryTimer(setup.timers, setup.settings.recoveryEventTimeout)
// protect against event recovery stalling forever because of journal overloaded and such
setup.startRecoveryTimer(snapshot = false)
replayEvents(state.seqNr + 1L, state.toSeqNr)
@ -92,8 +93,6 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
}
case RecoverySuccess(highestSeqNr)
setup.log.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr)
cancelRecoveryTimer(setup.timers)
onRecoveryCompleted(state)
case ReplayMessagesFailure(cause)
@ -119,7 +118,6 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
if (state.eventSeenInInterval) {
stay(state.copy(eventSeenInInterval = false))
} else {
cancelRecoveryTimer(setup.timers)
val msg = s"Replay timed out, didn't get event within ]${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]"
onRecoveryFailure(new RecoveryTimedOut(msg), state.seqNr, None)
}
@ -142,7 +140,7 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
* @param message the message that was being processed when the exception was thrown
*/
protected def onRecoveryFailure(cause: Throwable, sequenceNr: Long, message: Option[Any]): Behavior[InternalProtocol] = {
cancelRecoveryTimer(setup.timers)
setup.cancelRecoveryTimer()
tryReturnRecoveryPermit("on replay failure: " + cause.getMessage)
val msg = message match {
@ -166,14 +164,8 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
tryUnstash(running)
} finally {
cancelRecoveryTimer(setup.timers)
setup.cancelRecoveryTimer()
}
// protect against event recovery stalling forever because of journal overloaded and such
private val EventRecoveryTickTimerKey = "event-recovery-tick"
private def startRecoveryTimer(timers: TimerScheduler[InternalProtocol], timeout: FiniteDuration): Unit =
timers.startPeriodicTimer(EventRecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout)
private def cancelRecoveryTimer(timers: TimerScheduler[InternalProtocol]): Unit = timers.cancel(EventRecoveryTickTimerKey)
}

View file

@ -41,7 +41,8 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E
extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] {
def createBehavior(): Behavior[InternalProtocol] = {
startRecoveryTimer()
// protect against snapshot stalling forever because of journal overloaded and such
setup.startRecoveryTimer(snapshot = true)
loadSnapshot(setup.recovery.fromSnapshot, setup.recovery.toSequenceNr)
@ -63,7 +64,7 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E
* @param event the event that was processed in `receiveRecover`, if the exception was thrown there
*/
private def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[InternalProtocol] = {
cancelRecoveryTimer(setup.timers)
setup.cancelRecoveryTimer()
event match {
case Some(evt)
@ -108,7 +109,6 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E
becomeReplayingEvents(state, seqNr, toSnr)
case LoadSnapshotFailed(cause)
cancelRecoveryTimer(setup.timers)
onRecoveryFailure(cause, event = None)
case _
@ -117,7 +117,7 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E
}
private def becomeReplayingEvents(state: S, lastSequenceNr: Long, toSnr: Long): Behavior[InternalProtocol] = {
cancelRecoveryTimer(setup.timers)
setup.cancelRecoveryTimer()
EventsourcedReplayingEvents[C, E, S](
setup,
@ -125,10 +125,4 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E
)
}
// protect against snapshot stalling forever because of journal overloaded and such
private val SnapRecoveryTickTimerKey = "snapshot-recovery-tick"
private def startRecoveryTimer(): Unit =
setup.timers.startPeriodicTimer(SnapRecoveryTickTimerKey, RecoveryTickEvent(snapshot = true), setup.settings.recoveryEventTimeout)
private def cancelRecoveryTimer(timers: TimerScheduler[_]): Unit = timers.cancel(SnapRecoveryTickTimerKey)
}

View file

@ -13,31 +13,15 @@ import com.typesafe.config.Config
import scala.concurrent.duration._
/** INTERNAL API */
@InternalApi
private[akka] trait EventsourcedSettings {
def stashCapacity: Int
def logOnStashing: Boolean
def stashOverflowStrategyConfigurator: String
def recoveryEventTimeout: FiniteDuration
def journalPluginId: String
def withJournalPluginId(id: String): EventsourcedSettings
def snapshotPluginId: String
def withSnapshotPluginId(id: String): EventsourcedSettings
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object EventsourcedSettings {
def apply(system: ActorSystem[_]): EventsourcedSettings =
apply(system.settings.config)
def apply(system: ActorSystem[_], journalPluginId: String, snapshotPluginId: String): EventsourcedSettings =
apply(system.settings.config, journalPluginId, snapshotPluginId)
def apply(config: Config): EventsourcedSettings = {
def apply(config: Config, journalPluginId: String, snapshotPluginId: String): EventsourcedSettings = {
val typedConfig = config.getConfig("akka.persistence.typed")
// StashOverflowStrategy
@ -48,13 +32,17 @@ private[akka] trait EventsourcedSettings {
val logOnStashing = typedConfig.getBoolean("log-stashing")
EventsourcedSettingsImpl(
config,
val journalConfig = journalConfigFor(config, journalPluginId)
val recoveryEventTimeout: FiniteDuration =
journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis
EventsourcedSettings(
stashCapacity = stashCapacity,
stashOverflowStrategyConfigurator,
logOnStashing = logOnStashing,
journalPluginId = "",
snapshotPluginId = ""
recoveryEventTimeout,
journalPluginId,
snapshotPluginId
)
}
@ -71,26 +59,16 @@ private[akka] trait EventsourcedSettings {
}
@InternalApi
private[persistence] final case class EventsourcedSettingsImpl(
private val config: Config,
private[akka] final case class EventsourcedSettings(
stashCapacity: Int,
stashOverflowStrategyConfigurator: String,
logOnStashing: Boolean,
recoveryEventTimeout: FiniteDuration,
journalPluginId: String,
snapshotPluginId: String
) extends EventsourcedSettings {
snapshotPluginId: String) {
def withJournalPluginId(id: String): EventsourcedSettings = {
require(id != null, "journal plugin id must not be null; use empty string for 'default' journal")
copy(journalPluginId = id)
}
def withSnapshotPluginId(id: String): EventsourcedSettings = {
require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
copy(snapshotPluginId = id)
}
private val journalConfig = EventsourcedSettings.journalConfigFor(config, journalPluginId)
val recoveryEventTimeout = journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis
require(journalPluginId != null, "journal plugin id must not be null; use empty string for 'default' journal")
require(snapshotPluginId != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
}

View file

@ -4,6 +4,8 @@
package akka.persistence.typed.internal
import scala.concurrent.ExecutionContext
import akka.Done
import akka.actor.typed.Logger
import akka.actor.{ ActorRef, ExtendedActorSystem }
@ -16,16 +18,17 @@ import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol,
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.util.Collections.EmptyImmutableSeq
import akka.util.OptionVal
import scala.util.Try
import akka.actor.Cancellable
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.RecoveryTickEvent
/**
* INTERNAL API: Carry state for the Persistent behavior implementation behaviors
*/
@InternalApi
private[persistence] final class EventsourcedSetup[C, E, S](
val context: ActorContext[InternalProtocol],
val timers: TimerScheduler[InternalProtocol],
val persistenceId: String,
val emptyState: S,
val commandHandler: PersistentBehaviors.CommandHandler[C, E, S],
@ -83,5 +86,28 @@ private[persistence] final class EventsourcedSetup[C, E, S](
this
}
private var recoveryTimer: OptionVal[Cancellable] = OptionVal.None
def startRecoveryTimer(snapshot: Boolean): Unit = {
cancelRecoveryTimer()
implicit val ec: ExecutionContext = context.executionContext
val timer =
if (snapshot)
context.system.scheduler.scheduleOnce(settings.recoveryEventTimeout, context.self.toUntyped,
RecoveryTickEvent(snapshot = true))
else
context.system.scheduler.schedule(settings.recoveryEventTimeout, settings.recoveryEventTimeout,
context.self.toUntyped, RecoveryTickEvent(snapshot = false))
recoveryTimer = OptionVal.Some(timer)
}
def cancelRecoveryTimer(): Unit = {
recoveryTimer match {
case OptionVal.Some(t) t.cancel()
case OptionVal.None
}
recoveryTimer = OptionVal.None
}
}

View file

@ -4,18 +4,17 @@
package akka.persistence.typed.internal
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer }
import akka.actor.typed.{ PostStop, Signal }
import akka.actor.typed.scaladsl.StashBuffer
import akka.annotation.InternalApi
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.util.OptionVal
/**
* INTERNAL API
* Main reason for introduction of this trait is stash buffer reference management
* in order to survive restart of internal behavior
*/
@InternalApi
trait EventsourcedStashReferenceManagement {
@InternalApi private[akka] trait EventsourcedStashReferenceManagement {
private var stashBuffer: OptionVal[StashBuffer[InternalProtocol]] = OptionVal.None
@ -28,12 +27,5 @@ trait EventsourcedStashReferenceManagement {
stashBuffer.get
}
def onSignalCleanup: (ActorContext[InternalProtocol], Signal) Unit = {
case (ctx, PostStop)
stashBuffer match {
case OptionVal.Some(buffer) buffer.unstashAll(ctx, Behaviors.ignore)
case _ Unit
}
case _ Unit
}
def clearStashBuffer(): Unit = stashBuffer = OptionVal.None
}

View file

@ -14,9 +14,10 @@ import akka.persistence.typed.{ EventAdapter, NoOpEventAdapter }
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
import akka.persistence.typed.scaladsl._
import akka.util.ConstantFun
import scala.util.{ Failure, Success, Try }
import akka.actor.typed.PostStop
@InternalApi
private[akka] object PersistentBehaviorImpl {
@ -40,7 +41,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
snapshotPluginId: Option[String] = None,
recoveryCompleted: (ActorContext[Command], State) Unit = ConstantFun.scalaAnyTwoToUnit,
tagger: Event Set[String] = (_: Event) Set.empty[String],
eventAdapter: EventAdapter[Event, _] = NoOpEventAdapter.instance[Event],
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery(),
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
@ -48,43 +49,47 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement {
override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
Behaviors.supervise(
Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx
Behaviors.withTimers { timers
val settings = EventsourcedSettings(ctx.system)
val internalStash = stashBuffer(settings)
Behaviors.tap(
onMessage = (_, _) Unit,
onSignal = onSignalCleanup,
behavior = {
val setup = new EventsourcedSetup(
ctx,
timers,
persistenceId,
emptyState,
commandHandler,
eventHandler,
WriterIdentity.newIdentity(),
recoveryCompleted,
onSnapshot,
tagger,
eventAdapter,
snapshotWhen,
recovery,
holdingRecoveryPermit = false,
settings = settings,
internalStash = internalStash
)
EventsourcedRequestingRecoveryPermit(setup)
}
)
}
Behaviors.supervise {
Behaviors.setup[InternalProtocol] { ctx
val settings = EventsourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""))
val internalStash = stashBuffer(settings)
val eventsourcedSetup = new EventsourcedSetup(
ctx,
persistenceId,
emptyState,
commandHandler,
eventHandler,
WriterIdentity.newIdentity(),
recoveryCompleted,
onSnapshot,
tagger,
eventAdapter,
snapshotWhen,
recovery,
holdingRecoveryPermit = false,
settings = settings,
internalStash = internalStash
)
Behaviors.tap(EventsourcedRequestingRecoveryPermit(eventsourcedSetup))(
onMessage = (_, _) Unit,
onSignal = {
case (_, PostStop)
eventsourcedSetup.cancelRecoveryTimer()
clearStashBuffer()
case _
})
}.widen[Any] {
case res: JournalProtocol.Response InternalProtocol.JournalResponse(res)
case res: SnapshotProtocol.Response InternalProtocol.SnapshotterResponse(res)
case RecoveryPermitter.RecoveryPermitGranted InternalProtocol.RecoveryPermitGranted
case internal: InternalProtocol internal // such as RecoveryTickEvent
case cmd: Command @unchecked InternalProtocol.IncomingCommand(cmd)
}.narrow[Command]).onFailure[JournalFailureException](supervisionStrategy)
}.narrow[Command]
}.onFailure[JournalFailureException](supervisionStrategy)
}
/**
@ -153,7 +158,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
* the journal understands
*/
def eventAdapter(adapter: EventAdapter[Event, _]): PersistentBehavior[Command, Event, State] =
copy(eventAdapter = adapter)
copy(eventAdapter = adapter.asInstanceOf[EventAdapter[Event, Any]])
/**
* The `callback` function is called to notify the actor that a snapshot has finished

View file

@ -9,9 +9,10 @@ import akka.actor.typed.{ Behavior, Signal, TypedAkkaSpecWithShutdown }
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, RecoveryPermitGranted }
import akka.actor.testkit.typed.scaladsl.{ ActorTestKit, TestProbe }
import scala.concurrent.duration.{ FiniteDuration, _ }
import com.typesafe.config.ConfigFactory
class EventsourcedStashReferenceManagementTest extends ActorTestKit with TypedAkkaSpecWithShutdown {
case class Impl() extends EventsourcedStashReferenceManagement
@ -55,28 +56,20 @@ class EventsourcedStashReferenceManagementTest extends ActorTestKit with TypedAk
case _: IncomingCommand[_] Behaviors.stopped
}.receiveSignal {
case (_, signal: Signal)
onSignalCleanup.apply(ctx, signal); Behaviors.stopped[InternalProtocol]
clearStashBuffer()
Behaviors.stopped[InternalProtocol]
}
)
}
}
private def dummySettings(capacity: Int = 42) = new EventsourcedSettings {
private def dummySettings(capacity: Int = 42) =
EventsourcedSettings(
stashCapacity = capacity,
stashOverflowStrategyConfigurator = "akka.persistence.ThrowExceptionConfigurator",
logOnStashing = false,
recoveryEventTimeout = 3.seconds,
journalPluginId = "",
snapshotPluginId = "")
override def stashCapacity: Int = capacity
override def logOnStashing: Boolean = ???
override def stashOverflowStrategyConfigurator: String = ???
override def recoveryEventTimeout: FiniteDuration = ???
override def journalPluginId: String = ???
override def withJournalPluginId(id: String): EventsourcedSettings = ???
override def snapshotPluginId: String = ???
override def withSnapshotPluginId(id: String): EventsourcedSettings = ???
}
}

View file

@ -4,13 +4,11 @@
package akka.persistence.typed.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy, TypedAkkaSpecWithShutdown }
import akka.actor.testkit.typed.TE
import akka.persistence.AtomicWrite
import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.typed.EventRejectedException
@ -30,11 +28,11 @@ class ChaosJournal extends InmemJournal {
val pid = messages.head.persistenceId
if (pid == "fail-first-2" && count < 2) {
count += 1
Future.failed(new RuntimeException("database says no"))
Future.failed(TE("database says no"))
} else if (pid == "reject-first" && reject) {
reject = false
Future.successful(messages.map(aw Try {
throw new RuntimeException("I don't like it")
throw TE("I don't like it")
}))
} else {
super.asyncWriteMessages(messages)
@ -44,7 +42,7 @@ class ChaosJournal extends InmemJournal {
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
if (persistenceId == "fail-recovery-once" && failRecovery) {
failRecovery = false
Future.failed(new RuntimeException("Nah"))
Future.failed(TE("Nah"))
} else {
super.asyncReadHighestSequenceNr(persistenceId, fromSequenceNr)
}

View file

@ -21,11 +21,13 @@ import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.concurrent.Eventually
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.{ Success, Try }
import akka.persistence.journal.inmem.InmemJournal
object PersistentBehaviorSpec {
//#event-wrapper
@ -36,12 +38,12 @@ object PersistentBehaviorSpec {
}
//#event-wrapper
class InMemorySnapshotStore extends SnapshotStore {
class SlowInMemorySnapshotStore extends SnapshotStore {
private var state = Map.empty[String, (Any, SnapshotMetadata)]
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
Future.successful(state.get(persistenceId).map(r SelectedSnapshot(r._2, r._1)))
Promise().future // never completed
}
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
@ -63,6 +65,11 @@ object PersistentBehaviorSpec {
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
slow-snapshot-store.class = "${classOf[SlowInMemorySnapshotStore].getName}"
short-recovery-timeout {
class = "${classOf[InmemJournal].getName}"
recovery-event-timeout = 10 millis
}
""")
sealed trait Command
@ -556,6 +563,48 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1))))
}
"handle scheduled message arriving before recovery completed " in {
val c = spawn(Behaviors.withTimers[Command] {
timers
timers.startSingleTimer("tick", Increment, 1.millis)
Thread.sleep(30) // now it's probably already in the mailbox, and will be stashed
counter(nextPid)
})
val probe = TestProbe[State]
c ! Increment
probe.awaitAssert {
c ! GetValue(probe.ref)
probe.expectMessage(State(2, Vector(0, 1)))
}
}
"handle scheduled message arriving after recovery completed " in {
val c = spawn(Behaviors.withTimers[Command] {
timers
// probably arrives after recovery completed
timers.startSingleTimer("tick", Increment, 200.millis)
counter(nextPid)
})
val probe = TestProbe[State]
c ! Increment
probe.awaitAssert {
c ! GetValue(probe.ref)
probe.expectMessage(State(2, Vector(0, 1)))
}
}
"fail after recovery timeout" in {
val c = spawn(counter(nextPid)
.withSnapshotPluginId("slow-snapshot-store")
.withJournalPluginId("short-recovery-timeout"))
val probe = TestProbe[State]
probe.expectTerminated(c, probe.remainingOrDefault)
}
def watcher(toWatch: ActorRef[_]): TestProbe[String] = {
val probe = TestProbe[String]()
val w = Behaviors.setup[Any] { (ctx)