diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/SnapshotAdapter.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/SnapshotAdapter.scala new file mode 100644 index 0000000000..e242b6c28f --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/SnapshotAdapter.scala @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed + +/** + * Facility to convert snapshots from and to a specialized data model. + * Can be used when migration from different state types e.g. when migration + * from Persistent FSM to Typed Persistence. + * + * @tparam State The state type of the `EventSourcedBehavior` + */ +trait SnapshotAdapter[State] { + + /** + * Transform the state to a different type before sending to the journal. + */ + def toJournal(state: State): Any + + /** + * Transform the stored state into the current state type. + * Can be used for migrations from different serialized state types. + */ + def fromJournal(from: Any): State +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index 2c2be2083b..ab42d9ee68 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -6,7 +6,6 @@ package akka.persistence.typed.internal import scala.concurrent.ExecutionContext import scala.util.control.NonFatal - import akka.actor.Cancellable import akka.actor.typed.Logger import akka.actor.typed.scaladsl.ActorContext @@ -14,8 +13,7 @@ import akka.actor.ActorRef import akka.actor.typed.Signal import akka.annotation.InternalApi import akka.persistence._ -import akka.persistence.typed.EventAdapter -import akka.persistence.typed.PersistenceId +import akka.persistence.typed.{ EventAdapter, PersistenceId, SnapshotAdapter } import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.persistence.typed.scaladsl.RetentionCriteria import akka.util.ConstantFun @@ -45,6 +43,7 @@ private[akka] final class BehaviorSetup[C, E, S]( private val signalHandler: PartialFunction[(S, Signal), Unit], val tagger: E => Set[String], val eventAdapter: EventAdapter[E, Any], + val snapshotAdapter: SnapshotAdapter[S], val snapshotWhen: (S, E, Long) => Boolean, val recovery: Recovery, val retention: RetentionCriteria, @@ -155,7 +154,7 @@ private[akka] object MDC { val ReplayingEvents = "replay-evts" val RunningCmds = "running-cmnds" val PersistingEvents = "persist-evts" - val StoringSnapshot = "storing-snapshot" + val StoringSnapshot = "storing-snapshot" // format: ON def create(persistenceId: PersistenceId, phaseName: String): Map[String, Any] = { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index c68e08ca2b..aa3eb65b37 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -27,6 +27,7 @@ import akka.persistence.typed.DeleteSnapshotsCompleted import akka.persistence.typed.DeleteSnapshotsFailed import akka.persistence.typed.DeletionTarget import akka.persistence.typed.EventAdapter +import akka.persistence.typed.SnapshotAdapter import akka.persistence.typed.NoOpEventAdapter import akka.persistence.typed.PersistenceId import akka.persistence.typed.SnapshotCompleted @@ -66,6 +67,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( snapshotPluginId: Option[String] = None, tagger: Event => Set[String] = (_: Event) => Set.empty[String], eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], + snapshotAdapter: SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State], snapshotWhen: (State, Event, Long) => Boolean = ConstantFun.scalaAnyThreeToFalse, recovery: Recovery = Recovery(), retention: RetentionCriteria = RetentionCriteria.disabled, @@ -122,6 +124,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( actualSignalHandler, tagger, eventAdapter, + snapshotAdapter, snapshotWhen, recovery, retention, @@ -204,6 +207,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( override def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] = copy(eventAdapter = adapter.asInstanceOf[EventAdapter[Event, Any]]) + override def snapshotAdapter(adapter: SnapshotAdapter[State]): EventSourcedBehavior[Command, Event, State] = + copy(snapshotAdapter = adapter) + override def onPersistFailure( backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] = copy(supervisionStrategy = backoffStrategy) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala index 9b613d3084..ff7bb3c23f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala @@ -148,7 +148,9 @@ private[akka] trait SnapshotInteractions[C, E, S] { throw new IllegalStateException("A snapshot must not be a null state.") else setup.snapshotStore.tell( - SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId.id, state.seqNr), state.state), + SnapshotProtocol.SaveSnapshot( + SnapshotMetadata(setup.persistenceId.id, state.seqNr), + setup.snapshotAdapter.toJournal(state.state)), setup.selfUntyped) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/NoOpSnapshotAdapter.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/NoOpSnapshotAdapter.scala new file mode 100644 index 0000000000..57b6667ed1 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/NoOpSnapshotAdapter.scala @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed.internal + +import akka.annotation.InternalApi +import akka.persistence.typed.SnapshotAdapter + +/** + * INTERNAL API + */ +@InternalApi +private[akka] class NoOpSnapshotAdapter extends SnapshotAdapter[Any] { + override def toJournal(state: Any): Any = state + override def fromJournal(from: Any): Any = from +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object NoOpSnapshotAdapter { + val i = new NoOpSnapshotAdapter + def instance[S]: SnapshotAdapter[S] = i.asInstanceOf[SnapshotAdapter[S]] +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index 793b00f034..fd56dbca91 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -123,7 +123,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup val seqNr: Long = sso match { case Some(SelectedSnapshot(metadata, snapshot)) => - state = snapshot.asInstanceOf[S] + state = setup.snapshotAdapter.fromJournal(snapshot) metadata.sequenceNr case None => 0 // from the beginning please } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala index 5d69914136..ed039ec669 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -155,6 +155,12 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( */ def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event] + /** + * Transform the state into another type before giving it to and from the journal. Can be used + * to migrate from different state types e.g. when migration from PersistentFSM to Typed Persistence + */ + def snapshotAdapter(): SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State] + /** * INTERNAL API: DeferredBehavior init */ @@ -178,6 +184,7 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( .withRetention(retentionCriteria.asScala) .withTagger(tagger) .eventAdapter(eventAdapter()) + .snapshotAdapter(snapshotAdapter()) .withJournalPluginId(journalPluginId) .withSnapshotPluginId(snapshotPluginId) .withSnapshotSelectionCriteria(snapshotSelectionCriteria) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index 99c6f13af6..feaef49921 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -5,7 +5,6 @@ package akka.persistence.typed.scaladsl import scala.annotation.tailrec - import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior @@ -15,6 +14,7 @@ import akka.actor.typed.internal.LoggerClass import akka.actor.typed.scaladsl.ActorContext import akka.annotation.DoNotInherit import akka.persistence.typed.EventAdapter +import akka.persistence.typed.SnapshotAdapter import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.SnapshotSelectionCriteria @@ -183,11 +183,17 @@ object EventSourcedBehavior { def withTagger(tagger: Event => Set[String]): EventSourcedBehavior[Command, Event, State] /** - * Transform the event in another type before giving to the journal. Can be used to wrap events + * Transform the event to another type before giving to the journal. Can be used to wrap events * in types Journals understand but is of a different type than `Event`. */ def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] + /** + * Transform the state to another type before giving to the journal. Can be used to transform older + * state types into the current state type e.g. when migrating from Persistent FSM to Typed Persistence. + */ + def snapshotAdapter(adapter: SnapshotAdapter[State]): EventSourcedBehavior[Command, Event, State] + /** * Back off strategy for persist failures. * diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java index c564f27810..ec74866c85 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java @@ -10,11 +10,8 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.Scheduler; import akka.actor.typed.javadsl.Behaviors; import akka.japi.function.Procedure; -import akka.persistence.typed.EventSeq; -import akka.persistence.typed.SnapshotSelectionCriteria; -import akka.persistence.typed.EventAdapter; +import akka.persistence.typed.*; import akka.actor.testkit.typed.javadsl.TestInbox; -import akka.persistence.typed.PersistenceId; import java.time.Duration; import java.util.*; @@ -120,6 +117,22 @@ public class PersistentActorCompileOnlyTest { return new EventAdapterExample(); } // #install-event-adapter + + @Override + public SnapshotAdapter snapshotAdapter() { + return new SnapshotAdapter() { + + @Override + public Object toJournal(SimpleState simpleState) { + return simpleState; + } + + @Override + public SimpleState fromJournal(Object from) { + return (SimpleState) from; + } + }; + } }; static class AdditionalSettings diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSnapshotAdapterSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSnapshotAdapterSpec.scala new file mode 100644 index 0000000000..bbb452360d --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSnapshotAdapterSpec.scala @@ -0,0 +1,103 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import java.util.UUID +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe } +import akka.actor.typed.ActorRef +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal +import akka.persistence.typed.{ PersistenceId, SnapshotAdapter } +import akka.serialization.jackson.CborSerializable +import akka.stream.ActorMaterializer +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.WordSpecLike + +object EventSourcedSnapshotAdapterSpec { + private val conf: Config = ConfigFactory.parseString(s""" + akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}" + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}" + """) + case class State(s: String) extends CborSerializable + case class Command(c: String) extends CborSerializable + case class Event(e: String) extends CborSerializable + case class PersistedState(s: String) extends CborSerializable +} + +class EventSourcedSnapshotAdapterSpec + extends ScalaTestWithActorTestKit(EventSourcedSnapshotAdapterSpec.conf) + with WordSpecLike { + import EventSourcedSnapshotAdapterSpec._ + import akka.actor.typed.scaladsl.adapter._ + + val pidCounter = new AtomicInteger(0) + private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})") + implicit val materializer = ActorMaterializer()(system.toUntyped) + val queries: LeveldbReadJournal = + PersistenceQuery(system.toUntyped).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier) + + private def behavior(pid: PersistenceId, probe: ActorRef[State]): EventSourcedBehavior[Command, Event, State] = + EventSourcedBehavior[Command, Event, State]( + pid, + State(""), + commandHandler = { (state, command) => + command match { + case Command(c) if c == "shutdown" => + Effect.stop() + case Command(c) if c == "get" => + probe.tell(state) + Effect.none + case _ => + Effect.persist(Event(command.c)).thenRun(newState => probe ! newState) + } + }, + eventHandler = { (state, evt) => + state.copy(s = state.s + "|" + evt.e) + }) + + "Snapshot adapter" must { + + "adapt snapshots to any" in { + val pid = nextPid() + val stateProbe = TestProbe[State]() + val snapshotFromJournal = TestProbe[PersistedState]() + val snapshotToJournal = TestProbe[State]() + val b = behavior(pid, stateProbe.ref) + .snapshotAdapter(new SnapshotAdapter[State]() { + override def toJournal(state: State): Any = { + snapshotToJournal.ref.tell(state) + PersistedState(state.s) + } + override def fromJournal(from: Any): State = from match { + case ps: PersistedState => + snapshotFromJournal.ref.tell(ps) + State(ps.s) + } + }) + .snapshotWhen { (_, event, _) => + event.e.contains("snapshot") + } + + val ref = spawn(b) + + ref.tell(Command("one")) + stateProbe.expectMessage(State("|one")) + ref.tell(Command("snapshot now")) + stateProbe.expectMessage(State("|one|snapshot now")) + snapshotToJournal.expectMessage(State("|one|snapshot now")) + ref.tell(Command("shutdown")) + + val ref2 = spawn(b) + snapshotFromJournal.expectMessage(PersistedState("|one|snapshot now")) + ref2.tell(Command("get")) + stateProbe.expectMessage(State("|one|snapshot now")) + } + + } +}