diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 95aa904226..6fbef62c99 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -342,7 +342,7 @@ private[akka] object Running { setup.log.debug("Discarding message [{}], because actor is to be stopped.", cmd) Behaviors.unhandled } else { - stashUser(cmd) + stashInternal(cmd) Behaviors.same } } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/StashingWhenSnapshottingSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/StashingWhenSnapshottingSpec.scala new file mode 100644 index 0000000000..724119525e --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/StashingWhenSnapshottingSpec.scala @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed + +import java.util.concurrent.CyclicBarrier + +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.ActorSystem +import akka.actor.typed.Extension +import akka.actor.typed.ExtensionId +import akka.persistence +import akka.persistence.SelectedSnapshot +import akka.persistence.snapshot.SnapshotStore +import com.typesafe.config.ConfigFactory +import akka.actor.typed.scaladsl.adapter._ +import akka.persistence.typed.StashingWhenSnapshottingSpec.ControllableSnapshotStoreExt +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import org.scalatest.WordSpecLike + +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.Success + +object StashingWhenSnapshottingSpec { + object ControllableSnapshotStoreExt extends ExtensionId[ControllableSnapshotStoreExt] { + + override def createExtension(system: ActorSystem[_]): ControllableSnapshotStoreExt = + new ControllableSnapshotStoreExt() + } + + class ControllableSnapshotStoreExt extends Extension { + val completeSnapshotWrite = Promise[Unit]() + val snapshotWriteStarted = new CyclicBarrier(2) + } + + class ControllableSnapshotStore extends SnapshotStore { + override def loadAsync( + persistenceId: String, + criteria: persistence.SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = Future.successful(None) + + override def saveAsync(metadata: persistence.SnapshotMetadata, snapshot: Any): Future[Unit] = { + ControllableSnapshotStoreExt(context.system.toTyped).snapshotWriteStarted.await() + ControllableSnapshotStoreExt(context.system.toTyped).completeSnapshotWrite.future + } + override def deleteAsync(metadata: persistence.SnapshotMetadata): Future[Unit] = Future.successful(()) + override def deleteAsync(persistenceId: String, criteria: persistence.SnapshotSelectionCriteria): Future[Unit] = + Future.successful(()) + } + val config = ConfigFactory.parseString(s""" + slow-snapshot { + class = "akka.persistence.typed.StashingWhenSnapshottingSpec$$ControllableSnapshotStore" + } + akka.actor.allow-java-serialization = on + akka { + loglevel = "INFO" + + persistence { + journal { + plugin = "akka.persistence.journal.inmem" + auto-start-journals = [] + } + + snapshot-store { + plugin = "slow-snapshot" + auto-start-journals = [] + } + } + } + """) + + def persistentTestBehavior(pid: PersistenceId, eventProbe: TestProbe[String]) = + EventSourcedBehavior[String, String, List[String]]( + pid, + Nil, + (_, command) => Effect.persist(command), + (state, event) => { + eventProbe.ref.tell(event) + event :: state + }).snapshotWhen((_, event, _) => event.contains("snap")) +} + +class StashingWhenSnapshottingSpec + extends ScalaTestWithActorTestKit(StashingWhenSnapshottingSpec.config) + with WordSpecLike { + "A persistent actor" should { + "stash messages and automatically replay when snapshot is in progress" in { + val eventProbe = TestProbe[String]() + val persistentActor = spawn(StashingWhenSnapshottingSpec.persistentTestBehavior(PersistenceId("1"), eventProbe)) + persistentActor ! "one" + eventProbe.expectMessage("one") + persistentActor ! "snap" + eventProbe.expectMessage("snap") + ControllableSnapshotStoreExt(system).snapshotWriteStarted.await() + persistentActor ! "two" + eventProbe.expectNoMessage() // snapshot in progress + ControllableSnapshotStoreExt(system).completeSnapshotWrite.complete(Success(())) + eventProbe.expectMessage("two") + } + } +}