diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 9376ccb746..a8bdb36042 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -110,6 +110,9 @@ object Behavior { * // all other kinds of Number will be `unhandled` * } * }}} + * + * Scheduled messages via [[akka.actor.typed.scaladsl.TimerScheduler]] can currently + * not be used together with `widen`, see issue #25318. */ def widen[U](matcher: PartialFunction[U, T]): Behavior[U] = BehaviorImpl.widened(behavior, matcher) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala index c0d3e0fc63..228e139a45 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala @@ -9,9 +9,10 @@ import akka.util.{ ConstantFun, LineNumbers } import akka.annotation.InternalApi import akka.actor.typed.{ ActorContext ⇒ AC } import akka.actor.typed.scaladsl.{ ActorContext ⇒ SAC } - import scala.reflect.ClassTag +import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg + /** * INTERNAL API */ @@ -45,11 +46,18 @@ import scala.reflect.ClassTag override def receiveSignal(ctx: AC[U], signal: Signal): Behavior[U] = widen(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T]) - override def receive(ctx: AC[U], msg: U): Behavior[U] = + override def receive(ctx: AC[U], msg: U): Behavior[U] = { + // widen would wrap the TimerMessage, which would be wrong, see issue #25318 + msg match { + case t: TimerMsg ⇒ throw new IllegalArgumentException( + s"Timers and widen can't be used together, [${t.key}]. See issue #25318") + case _ ⇒ + } matcher.applyOrElse(msg, any2null) match { case null ⇒ unhandled case transformed ⇒ widen(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T]) } + } override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})" } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala index a54ab0723f..9f236e146a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala @@ -255,6 +255,9 @@ object Behaviors { * ); * }}} * + * Scheduled messages via [[TimerScheduler]] can currently not be used + * together with `widen`, see issue #25318. + * * @param behavior * the behavior that will receive the selected messages * @param selector diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala index 513f6e04c6..ab187e6143 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala @@ -56,7 +56,8 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set def createBehavior(state: ReplayingState[S]): Behavior[InternalProtocol] = { Behaviors.setup { _ ⇒ - startRecoveryTimer(setup.timers, setup.settings.recoveryEventTimeout) + // protect against event recovery stalling forever because of journal overloaded and such + setup.startRecoveryTimer(snapshot = false) replayEvents(state.seqNr + 1L, state.toSeqNr) @@ -92,8 +93,6 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set } case RecoverySuccess(highestSeqNr) ⇒ setup.log.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr) - cancelRecoveryTimer(setup.timers) - onRecoveryCompleted(state) case ReplayMessagesFailure(cause) ⇒ @@ -119,7 +118,6 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set if (state.eventSeenInInterval) { stay(state.copy(eventSeenInInterval = false)) } else { - cancelRecoveryTimer(setup.timers) val msg = s"Replay timed out, didn't get event within ]${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]" onRecoveryFailure(new RecoveryTimedOut(msg), state.seqNr, None) } @@ -142,7 +140,7 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set * @param message the message that was being processed when the exception was thrown */ protected def onRecoveryFailure(cause: Throwable, sequenceNr: Long, message: Option[Any]): Behavior[InternalProtocol] = { - cancelRecoveryTimer(setup.timers) + setup.cancelRecoveryTimer() tryReturnRecoveryPermit("on replay failure: " + cause.getMessage) val msg = message match { @@ -166,14 +164,8 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set tryUnstash(running) } finally { - cancelRecoveryTimer(setup.timers) + setup.cancelRecoveryTimer() } - // protect against event recovery stalling forever because of journal overloaded and such - private val EventRecoveryTickTimerKey = "event-recovery-tick" - private def startRecoveryTimer(timers: TimerScheduler[InternalProtocol], timeout: FiniteDuration): Unit = - timers.startPeriodicTimer(EventRecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout) - private def cancelRecoveryTimer(timers: TimerScheduler[InternalProtocol]): Unit = timers.cancel(EventRecoveryTickTimerKey) - } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala index c550ed8db2..18152fbb2c 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala @@ -41,7 +41,8 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { def createBehavior(): Behavior[InternalProtocol] = { - startRecoveryTimer() + // protect against snapshot stalling forever because of journal overloaded and such + setup.startRecoveryTimer(snapshot = true) loadSnapshot(setup.recovery.fromSnapshot, setup.recovery.toSequenceNr) @@ -63,7 +64,7 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E * @param event the event that was processed in `receiveRecover`, if the exception was thrown there */ private def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[InternalProtocol] = { - cancelRecoveryTimer(setup.timers) + setup.cancelRecoveryTimer() event match { case Some(evt) ⇒ @@ -108,7 +109,6 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E becomeReplayingEvents(state, seqNr, toSnr) case LoadSnapshotFailed(cause) ⇒ - cancelRecoveryTimer(setup.timers) onRecoveryFailure(cause, event = None) case _ ⇒ @@ -117,7 +117,7 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E } private def becomeReplayingEvents(state: S, lastSequenceNr: Long, toSnr: Long): Behavior[InternalProtocol] = { - cancelRecoveryTimer(setup.timers) + setup.cancelRecoveryTimer() EventsourcedReplayingEvents[C, E, S]( setup, @@ -125,10 +125,4 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E ) } - // protect against snapshot stalling forever because of journal overloaded and such - private val SnapRecoveryTickTimerKey = "snapshot-recovery-tick" - private def startRecoveryTimer(): Unit = - setup.timers.startPeriodicTimer(SnapRecoveryTickTimerKey, RecoveryTickEvent(snapshot = true), setup.settings.recoveryEventTimeout) - private def cancelRecoveryTimer(timers: TimerScheduler[_]): Unit = timers.cancel(SnapRecoveryTickTimerKey) - } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala index e1c98dd1df..60652fd643 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala @@ -13,31 +13,15 @@ import com.typesafe.config.Config import scala.concurrent.duration._ -/** INTERNAL API */ -@InternalApi -private[akka] trait EventsourcedSettings { - - def stashCapacity: Int - def logOnStashing: Boolean - def stashOverflowStrategyConfigurator: String - - def recoveryEventTimeout: FiniteDuration - - def journalPluginId: String - def withJournalPluginId(id: String): EventsourcedSettings - def snapshotPluginId: String - def withSnapshotPluginId(id: String): EventsourcedSettings -} - /** * INTERNAL API */ @InternalApi private[akka] object EventsourcedSettings { - def apply(system: ActorSystem[_]): EventsourcedSettings = - apply(system.settings.config) + def apply(system: ActorSystem[_], journalPluginId: String, snapshotPluginId: String): EventsourcedSettings = + apply(system.settings.config, journalPluginId, snapshotPluginId) - def apply(config: Config): EventsourcedSettings = { + def apply(config: Config, journalPluginId: String, snapshotPluginId: String): EventsourcedSettings = { val typedConfig = config.getConfig("akka.persistence.typed") // StashOverflowStrategy @@ -48,13 +32,17 @@ private[akka] trait EventsourcedSettings { val logOnStashing = typedConfig.getBoolean("log-stashing") - EventsourcedSettingsImpl( - config, + val journalConfig = journalConfigFor(config, journalPluginId) + val recoveryEventTimeout: FiniteDuration = + journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis + + EventsourcedSettings( stashCapacity = stashCapacity, stashOverflowStrategyConfigurator, logOnStashing = logOnStashing, - journalPluginId = "", - snapshotPluginId = "" + recoveryEventTimeout, + journalPluginId, + snapshotPluginId ) } @@ -71,26 +59,16 @@ private[akka] trait EventsourcedSettings { } @InternalApi -private[persistence] final case class EventsourcedSettingsImpl( - private val config: Config, +private[akka] final case class EventsourcedSettings( stashCapacity: Int, stashOverflowStrategyConfigurator: String, logOnStashing: Boolean, + recoveryEventTimeout: FiniteDuration, journalPluginId: String, - snapshotPluginId: String -) extends EventsourcedSettings { + snapshotPluginId: String) { - def withJournalPluginId(id: String): EventsourcedSettings = { - require(id != null, "journal plugin id must not be null; use empty string for 'default' journal") - copy(journalPluginId = id) - } - def withSnapshotPluginId(id: String): EventsourcedSettings = { - require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store") - copy(snapshotPluginId = id) - } - - private val journalConfig = EventsourcedSettings.journalConfigFor(config, journalPluginId) - val recoveryEventTimeout = journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis + require(journalPluginId != null, "journal plugin id must not be null; use empty string for 'default' journal") + require(snapshotPluginId != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store") } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala index 9f23f27325..e458300343 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala @@ -4,6 +4,8 @@ package akka.persistence.typed.internal +import scala.concurrent.ExecutionContext + import akka.Done import akka.actor.typed.Logger import akka.actor.{ ActorRef, ExtendedActorSystem } @@ -16,16 +18,17 @@ import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, import akka.persistence.typed.scaladsl.PersistentBehaviors import akka.util.Collections.EmptyImmutableSeq import akka.util.OptionVal - import scala.util.Try +import akka.actor.Cancellable +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.RecoveryTickEvent + /** * INTERNAL API: Carry state for the Persistent behavior implementation behaviors */ @InternalApi private[persistence] final class EventsourcedSetup[C, E, S]( val context: ActorContext[InternalProtocol], - val timers: TimerScheduler[InternalProtocol], val persistenceId: String, val emptyState: S, val commandHandler: PersistentBehaviors.CommandHandler[C, E, S], @@ -83,5 +86,28 @@ private[persistence] final class EventsourcedSetup[C, E, S]( this } + private var recoveryTimer: OptionVal[Cancellable] = OptionVal.None + + def startRecoveryTimer(snapshot: Boolean): Unit = { + cancelRecoveryTimer() + implicit val ec: ExecutionContext = context.executionContext + val timer = + if (snapshot) + context.system.scheduler.scheduleOnce(settings.recoveryEventTimeout, context.self.toUntyped, + RecoveryTickEvent(snapshot = true)) + else + context.system.scheduler.schedule(settings.recoveryEventTimeout, settings.recoveryEventTimeout, + context.self.toUntyped, RecoveryTickEvent(snapshot = false)) + recoveryTimer = OptionVal.Some(timer) + } + + def cancelRecoveryTimer(): Unit = { + recoveryTimer match { + case OptionVal.Some(t) ⇒ t.cancel() + case OptionVal.None ⇒ + } + recoveryTimer = OptionVal.None + } + } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagement.scala index 9087c7be9f..bbca1249aa 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagement.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagement.scala @@ -4,18 +4,17 @@ package akka.persistence.typed.internal -import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer } -import akka.actor.typed.{ PostStop, Signal } +import akka.actor.typed.scaladsl.StashBuffer import akka.annotation.InternalApi import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol import akka.util.OptionVal /** + * INTERNAL API * Main reason for introduction of this trait is stash buffer reference management * in order to survive restart of internal behavior */ -@InternalApi -trait EventsourcedStashReferenceManagement { +@InternalApi private[akka] trait EventsourcedStashReferenceManagement { private var stashBuffer: OptionVal[StashBuffer[InternalProtocol]] = OptionVal.None @@ -28,12 +27,5 @@ trait EventsourcedStashReferenceManagement { stashBuffer.get } - def onSignalCleanup: (ActorContext[InternalProtocol], Signal) ⇒ Unit = { - case (ctx, PostStop) ⇒ - stashBuffer match { - case OptionVal.Some(buffer) ⇒ buffer.unstashAll(ctx, Behaviors.ignore) - case _ ⇒ Unit - } - case _ ⇒ Unit - } + def clearStashBuffer(): Unit = stashBuffer = OptionVal.None } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala index 5431bc8f7b..5ac1c9ff83 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala @@ -14,9 +14,10 @@ import akka.persistence.typed.{ EventAdapter, NoOpEventAdapter } import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity } import akka.persistence.typed.scaladsl._ import akka.util.ConstantFun - import scala.util.{ Failure, Success, Try } +import akka.actor.typed.PostStop + @InternalApi private[akka] object PersistentBehaviorImpl { @@ -40,7 +41,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( snapshotPluginId: Option[String] = None, recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit, tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], - eventAdapter: EventAdapter[Event, _] = NoOpEventAdapter.instance[Event], + eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, recovery: Recovery = Recovery(), supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, @@ -48,43 +49,47 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( ) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement { override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { - Behaviors.supervise( - Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx ⇒ - Behaviors.withTimers { timers ⇒ - val settings = EventsourcedSettings(ctx.system) - val internalStash = stashBuffer(settings) - Behaviors.tap( - onMessage = (_, _) ⇒ Unit, - onSignal = onSignalCleanup, - behavior = { - val setup = new EventsourcedSetup( - ctx, - timers, - persistenceId, - emptyState, - commandHandler, - eventHandler, - WriterIdentity.newIdentity(), - recoveryCompleted, - onSnapshot, - tagger, - eventAdapter, - snapshotWhen, - recovery, - holdingRecoveryPermit = false, - settings = settings, - internalStash = internalStash - ) - EventsourcedRequestingRecoveryPermit(setup) - } - ) - } + Behaviors.supervise { + Behaviors.setup[InternalProtocol] { ctx ⇒ + val settings = EventsourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse("")) + + val internalStash = stashBuffer(settings) + + val eventsourcedSetup = new EventsourcedSetup( + ctx, + persistenceId, + emptyState, + commandHandler, + eventHandler, + WriterIdentity.newIdentity(), + recoveryCompleted, + onSnapshot, + tagger, + eventAdapter, + snapshotWhen, + recovery, + holdingRecoveryPermit = false, + settings = settings, + internalStash = internalStash + ) + + Behaviors.tap(EventsourcedRequestingRecoveryPermit(eventsourcedSetup))( + onMessage = (_, _) ⇒ Unit, + onSignal = { + case (_, PostStop) ⇒ + eventsourcedSetup.cancelRecoveryTimer() + clearStashBuffer() + case _ ⇒ + }) + }.widen[Any] { case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res) case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res) case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted + case internal: InternalProtocol ⇒ internal // such as RecoveryTickEvent case cmd: Command @unchecked ⇒ InternalProtocol.IncomingCommand(cmd) - }.narrow[Command]).onFailure[JournalFailureException](supervisionStrategy) + }.narrow[Command] + }.onFailure[JournalFailureException](supervisionStrategy) } /** @@ -153,7 +158,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( * the journal understands */ def eventAdapter(adapter: EventAdapter[Event, _]): PersistentBehavior[Command, Event, State] = - copy(eventAdapter = adapter) + copy(eventAdapter = adapter.asInstanceOf[EventAdapter[Event, Any]]) /** * The `callback` function is called to notify the actor that a snapshot has finished diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagementTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagementTest.scala index 45d0ff3aac..88a183d5fb 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagementTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagementTest.scala @@ -9,9 +9,10 @@ import akka.actor.typed.{ Behavior, Signal, TypedAkkaSpecWithShutdown } import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, RecoveryPermitGranted } import akka.actor.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } - import scala.concurrent.duration.{ FiniteDuration, _ } +import com.typesafe.config.ConfigFactory + class EventsourcedStashReferenceManagementTest extends ActorTestKit with TypedAkkaSpecWithShutdown { case class Impl() extends EventsourcedStashReferenceManagement @@ -55,28 +56,20 @@ class EventsourcedStashReferenceManagementTest extends ActorTestKit with TypedAk case _: IncomingCommand[_] ⇒ Behaviors.stopped }.receiveSignal { case (_, signal: Signal) ⇒ - onSignalCleanup.apply(ctx, signal); Behaviors.stopped[InternalProtocol] + clearStashBuffer() + Behaviors.stopped[InternalProtocol] } ) } } - private def dummySettings(capacity: Int = 42) = new EventsourcedSettings { + private def dummySettings(capacity: Int = 42) = + EventsourcedSettings( + stashCapacity = capacity, + stashOverflowStrategyConfigurator = "akka.persistence.ThrowExceptionConfigurator", + logOnStashing = false, + recoveryEventTimeout = 3.seconds, + journalPluginId = "", + snapshotPluginId = "") - override def stashCapacity: Int = capacity - - override def logOnStashing: Boolean = ??? - - override def stashOverflowStrategyConfigurator: String = ??? - - override def recoveryEventTimeout: FiniteDuration = ??? - - override def journalPluginId: String = ??? - - override def withJournalPluginId(id: String): EventsourcedSettings = ??? - - override def snapshotPluginId: String = ??? - - override def withSnapshotPluginId(id: String): EventsourcedSettings = ??? - } } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala index 38a56aac71..76477a32ad 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala @@ -4,13 +4,11 @@ package akka.persistence.typed.scaladsl -import java.util.concurrent.atomic.AtomicInteger - -import akka.Done import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy, TypedAkkaSpecWithShutdown } +import akka.actor.testkit.typed.TE import akka.persistence.AtomicWrite import akka.persistence.journal.inmem.InmemJournal import akka.persistence.typed.EventRejectedException @@ -30,11 +28,11 @@ class ChaosJournal extends InmemJournal { val pid = messages.head.persistenceId if (pid == "fail-first-2" && count < 2) { count += 1 - Future.failed(new RuntimeException("database says no")) + Future.failed(TE("database says no")) } else if (pid == "reject-first" && reject) { reject = false Future.successful(messages.map(aw ⇒ Try { - throw new RuntimeException("I don't like it") + throw TE("I don't like it") })) } else { super.asyncWriteMessages(messages) @@ -44,7 +42,7 @@ class ChaosJournal extends InmemJournal { override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { if (persistenceId == "fail-recovery-once" && failRecovery) { failRecovery = false - Future.failed(new RuntimeException("Nah")) + Future.failed(TE("Nah")) } else { super.asyncReadHighestSequenceNr(persistenceId, fromSequenceNr) } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala index 72feb718de..8796ebb7c4 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala @@ -21,11 +21,13 @@ import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.scaladsl._ import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.concurrent.Eventually - import scala.concurrent.Future +import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.{ Success, Try } +import akka.persistence.journal.inmem.InmemJournal + object PersistentBehaviorSpec { //#event-wrapper @@ -36,12 +38,12 @@ object PersistentBehaviorSpec { } //#event-wrapper - class InMemorySnapshotStore extends SnapshotStore { + class SlowInMemorySnapshotStore extends SnapshotStore { private var state = Map.empty[String, (Any, SnapshotMetadata)] def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { - Future.successful(state.get(persistenceId).map(r ⇒ SelectedSnapshot(r._2, r._1))) + Promise().future // never completed } def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { @@ -63,6 +65,11 @@ object PersistentBehaviorSpec { akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}" + slow-snapshot-store.class = "${classOf[SlowInMemorySnapshotStore].getName}" + short-recovery-timeout { + class = "${classOf[InmemJournal].getName}" + recovery-event-timeout = 10 millis + } """) sealed trait Command @@ -556,6 +563,48 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1)))) } + "handle scheduled message arriving before recovery completed " in { + val c = spawn(Behaviors.withTimers[Command] { + timers ⇒ + timers.startSingleTimer("tick", Increment, 1.millis) + Thread.sleep(30) // now it's probably already in the mailbox, and will be stashed + counter(nextPid) + }) + + val probe = TestProbe[State] + c ! Increment + probe.awaitAssert { + c ! GetValue(probe.ref) + probe.expectMessage(State(2, Vector(0, 1))) + } + } + + "handle scheduled message arriving after recovery completed " in { + val c = spawn(Behaviors.withTimers[Command] { + timers ⇒ + // probably arrives after recovery completed + timers.startSingleTimer("tick", Increment, 200.millis) + counter(nextPid) + }) + + val probe = TestProbe[State] + c ! Increment + probe.awaitAssert { + c ! GetValue(probe.ref) + probe.expectMessage(State(2, Vector(0, 1))) + } + } + + "fail after recovery timeout" in { + val c = spawn(counter(nextPid) + .withSnapshotPluginId("slow-snapshot-store") + .withJournalPluginId("short-recovery-timeout")) + + val probe = TestProbe[State] + + probe.expectTerminated(c, probe.remainingOrDefault) + } + def watcher(toWatch: ActorRef[_]): TestProbe[String] = { val probe = TestProbe[String]() val w = Behaviors.setup[Any] { (ctx) ⇒