Snapshot adapter for typed persistence (#27319)

* WIP

* Snapshot adapter for typed persistence

* Pesky java

* Move no op snapshot adapter to internal package

* remove unused import

* More private

* Formatting

* Add cbor serializeable to messages
This commit is contained in:
Christopher Batey 2019-07-11 16:22:19 +01:00 committed by GitHub
parent 3efc1c2877
commit fe2d1ee917
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 200 additions and 12 deletions

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
/**
* Facility to convert snapshots from and to a specialized data model.
* Can be used when migration from different state types e.g. when migration
* from Persistent FSM to Typed Persistence.
*
* @tparam State The state type of the `EventSourcedBehavior`
*/
trait SnapshotAdapter[State] {
/**
* Transform the state to a different type before sending to the journal.
*/
def toJournal(state: State): Any
/**
* Transform the stored state into the current state type.
* Can be used for migrations from different serialized state types.
*/
def fromJournal(from: Any): State
}

View file

@ -6,7 +6,6 @@ package akka.persistence.typed.internal
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.Cancellable import akka.actor.Cancellable
import akka.actor.typed.Logger import akka.actor.typed.Logger
import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.ActorContext
@ -14,8 +13,7 @@ import akka.actor.ActorRef
import akka.actor.typed.Signal import akka.actor.typed.Signal
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence._ import akka.persistence._
import akka.persistence.typed.EventAdapter import akka.persistence.typed.{ EventAdapter, PersistenceId, SnapshotAdapter }
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.RetentionCriteria import akka.persistence.typed.scaladsl.RetentionCriteria
import akka.util.ConstantFun import akka.util.ConstantFun
@ -45,6 +43,7 @@ private[akka] final class BehaviorSetup[C, E, S](
private val signalHandler: PartialFunction[(S, Signal), Unit], private val signalHandler: PartialFunction[(S, Signal), Unit],
val tagger: E => Set[String], val tagger: E => Set[String],
val eventAdapter: EventAdapter[E, Any], val eventAdapter: EventAdapter[E, Any],
val snapshotAdapter: SnapshotAdapter[S],
val snapshotWhen: (S, E, Long) => Boolean, val snapshotWhen: (S, E, Long) => Boolean,
val recovery: Recovery, val recovery: Recovery,
val retention: RetentionCriteria, val retention: RetentionCriteria,
@ -155,7 +154,7 @@ private[akka] object MDC {
val ReplayingEvents = "replay-evts" val ReplayingEvents = "replay-evts"
val RunningCmds = "running-cmnds" val RunningCmds = "running-cmnds"
val PersistingEvents = "persist-evts" val PersistingEvents = "persist-evts"
val StoringSnapshot = "storing-snapshot" val StoringSnapshot = "storing-snapshot"
// format: ON // format: ON
def create(persistenceId: PersistenceId, phaseName: String): Map[String, Any] = { def create(persistenceId: PersistenceId, phaseName: String): Map[String, Any] = {

View file

@ -27,6 +27,7 @@ import akka.persistence.typed.DeleteSnapshotsCompleted
import akka.persistence.typed.DeleteSnapshotsFailed import akka.persistence.typed.DeleteSnapshotsFailed
import akka.persistence.typed.DeletionTarget import akka.persistence.typed.DeletionTarget
import akka.persistence.typed.EventAdapter import akka.persistence.typed.EventAdapter
import akka.persistence.typed.SnapshotAdapter
import akka.persistence.typed.NoOpEventAdapter import akka.persistence.typed.NoOpEventAdapter
import akka.persistence.typed.PersistenceId import akka.persistence.typed.PersistenceId
import akka.persistence.typed.SnapshotCompleted import akka.persistence.typed.SnapshotCompleted
@ -66,6 +67,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
snapshotPluginId: Option[String] = None, snapshotPluginId: Option[String] = None,
tagger: Event => Set[String] = (_: Event) => Set.empty[String], tagger: Event => Set[String] = (_: Event) => Set.empty[String],
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
snapshotAdapter: SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State],
snapshotWhen: (State, Event, Long) => Boolean = ConstantFun.scalaAnyThreeToFalse, snapshotWhen: (State, Event, Long) => Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery(), recovery: Recovery = Recovery(),
retention: RetentionCriteria = RetentionCriteria.disabled, retention: RetentionCriteria = RetentionCriteria.disabled,
@ -122,6 +124,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
actualSignalHandler, actualSignalHandler,
tagger, tagger,
eventAdapter, eventAdapter,
snapshotAdapter,
snapshotWhen, snapshotWhen,
recovery, recovery,
retention, retention,
@ -204,6 +207,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
override def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] = override def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] =
copy(eventAdapter = adapter.asInstanceOf[EventAdapter[Event, Any]]) copy(eventAdapter = adapter.asInstanceOf[EventAdapter[Event, Any]])
override def snapshotAdapter(adapter: SnapshotAdapter[State]): EventSourcedBehavior[Command, Event, State] =
copy(snapshotAdapter = adapter)
override def onPersistFailure( override def onPersistFailure(
backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] = backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] =
copy(supervisionStrategy = backoffStrategy) copy(supervisionStrategy = backoffStrategy)

View file

@ -148,7 +148,9 @@ private[akka] trait SnapshotInteractions[C, E, S] {
throw new IllegalStateException("A snapshot must not be a null state.") throw new IllegalStateException("A snapshot must not be a null state.")
else else
setup.snapshotStore.tell( setup.snapshotStore.tell(
SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId.id, state.seqNr), state.state), SnapshotProtocol.SaveSnapshot(
SnapshotMetadata(setup.persistenceId.id, state.seqNr),
setup.snapshotAdapter.toJournal(state.state)),
setup.selfUntyped) setup.selfUntyped)
} }

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import akka.annotation.InternalApi
import akka.persistence.typed.SnapshotAdapter
/**
* INTERNAL API
*/
@InternalApi
private[akka] class NoOpSnapshotAdapter extends SnapshotAdapter[Any] {
override def toJournal(state: Any): Any = state
override def fromJournal(from: Any): Any = from
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] object NoOpSnapshotAdapter {
val i = new NoOpSnapshotAdapter
def instance[S]: SnapshotAdapter[S] = i.asInstanceOf[SnapshotAdapter[S]]
}

View file

@ -123,7 +123,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
val seqNr: Long = sso match { val seqNr: Long = sso match {
case Some(SelectedSnapshot(metadata, snapshot)) => case Some(SelectedSnapshot(metadata, snapshot)) =>
state = snapshot.asInstanceOf[S] state = setup.snapshotAdapter.fromJournal(snapshot)
metadata.sequenceNr metadata.sequenceNr
case None => 0 // from the beginning please case None => 0 // from the beginning please
} }

View file

@ -155,6 +155,12 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
*/ */
def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event] def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event]
/**
* Transform the state into another type before giving it to and from the journal. Can be used
* to migrate from different state types e.g. when migration from PersistentFSM to Typed Persistence
*/
def snapshotAdapter(): SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State]
/** /**
* INTERNAL API: DeferredBehavior init * INTERNAL API: DeferredBehavior init
*/ */
@ -178,6 +184,7 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
.withRetention(retentionCriteria.asScala) .withRetention(retentionCriteria.asScala)
.withTagger(tagger) .withTagger(tagger)
.eventAdapter(eventAdapter()) .eventAdapter(eventAdapter())
.snapshotAdapter(snapshotAdapter())
.withJournalPluginId(journalPluginId) .withJournalPluginId(journalPluginId)
.withSnapshotPluginId(snapshotPluginId) .withSnapshotPluginId(snapshotPluginId)
.withSnapshotSelectionCriteria(snapshotSelectionCriteria) .withSnapshotSelectionCriteria(snapshotSelectionCriteria)

View file

@ -5,7 +5,6 @@
package akka.persistence.typed.scaladsl package akka.persistence.typed.scaladsl
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior
@ -15,6 +14,7 @@ import akka.actor.typed.internal.LoggerClass
import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
import akka.persistence.typed.EventAdapter import akka.persistence.typed.EventAdapter
import akka.persistence.typed.SnapshotAdapter
import akka.persistence.typed.ExpectingReply import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId import akka.persistence.typed.PersistenceId
import akka.persistence.typed.SnapshotSelectionCriteria import akka.persistence.typed.SnapshotSelectionCriteria
@ -183,11 +183,17 @@ object EventSourcedBehavior {
def withTagger(tagger: Event => Set[String]): EventSourcedBehavior[Command, Event, State] def withTagger(tagger: Event => Set[String]): EventSourcedBehavior[Command, Event, State]
/** /**
* Transform the event in another type before giving to the journal. Can be used to wrap events * Transform the event to another type before giving to the journal. Can be used to wrap events
* in types Journals understand but is of a different type than `Event`. * in types Journals understand but is of a different type than `Event`.
*/ */
def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State]
/**
* Transform the state to another type before giving to the journal. Can be used to transform older
* state types into the current state type e.g. when migrating from Persistent FSM to Typed Persistence.
*/
def snapshotAdapter(adapter: SnapshotAdapter[State]): EventSourcedBehavior[Command, Event, State]
/** /**
* Back off strategy for persist failures. * Back off strategy for persist failures.
* *

View file

@ -10,11 +10,8 @@ import akka.actor.typed.ActorRef;
import akka.actor.typed.Scheduler; import akka.actor.typed.Scheduler;
import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Behaviors;
import akka.japi.function.Procedure; import akka.japi.function.Procedure;
import akka.persistence.typed.EventSeq; import akka.persistence.typed.*;
import akka.persistence.typed.SnapshotSelectionCriteria;
import akka.persistence.typed.EventAdapter;
import akka.actor.testkit.typed.javadsl.TestInbox; import akka.actor.testkit.typed.javadsl.TestInbox;
import akka.persistence.typed.PersistenceId;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.*;
@ -120,6 +117,22 @@ public class PersistentActorCompileOnlyTest {
return new EventAdapterExample(); return new EventAdapterExample();
} }
// #install-event-adapter // #install-event-adapter
@Override
public SnapshotAdapter<SimpleState> snapshotAdapter() {
return new SnapshotAdapter<SimpleState>() {
@Override
public Object toJournal(SimpleState simpleState) {
return simpleState;
}
@Override
public SimpleState fromJournal(Object from) {
return (SimpleState) from;
}
};
}
}; };
static class AdditionalSettings static class AdditionalSettings

View file

@ -0,0 +1,103 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe }
import akka.actor.typed.ActorRef
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.typed.{ PersistenceId, SnapshotAdapter }
import akka.serialization.jackson.CborSerializable
import akka.stream.ActorMaterializer
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.WordSpecLike
object EventSourcedSnapshotAdapterSpec {
private val conf: Config = ConfigFactory.parseString(s"""
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
""")
case class State(s: String) extends CborSerializable
case class Command(c: String) extends CborSerializable
case class Event(e: String) extends CborSerializable
case class PersistedState(s: String) extends CborSerializable
}
class EventSourcedSnapshotAdapterSpec
extends ScalaTestWithActorTestKit(EventSourcedSnapshotAdapterSpec.conf)
with WordSpecLike {
import EventSourcedSnapshotAdapterSpec._
import akka.actor.typed.scaladsl.adapter._
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})")
implicit val materializer = ActorMaterializer()(system.toUntyped)
val queries: LeveldbReadJournal =
PersistenceQuery(system.toUntyped).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
private def behavior(pid: PersistenceId, probe: ActorRef[State]): EventSourcedBehavior[Command, Event, State] =
EventSourcedBehavior[Command, Event, State](
pid,
State(""),
commandHandler = { (state, command) =>
command match {
case Command(c) if c == "shutdown" =>
Effect.stop()
case Command(c) if c == "get" =>
probe.tell(state)
Effect.none
case _ =>
Effect.persist(Event(command.c)).thenRun(newState => probe ! newState)
}
},
eventHandler = { (state, evt) =>
state.copy(s = state.s + "|" + evt.e)
})
"Snapshot adapter" must {
"adapt snapshots to any" in {
val pid = nextPid()
val stateProbe = TestProbe[State]()
val snapshotFromJournal = TestProbe[PersistedState]()
val snapshotToJournal = TestProbe[State]()
val b = behavior(pid, stateProbe.ref)
.snapshotAdapter(new SnapshotAdapter[State]() {
override def toJournal(state: State): Any = {
snapshotToJournal.ref.tell(state)
PersistedState(state.s)
}
override def fromJournal(from: Any): State = from match {
case ps: PersistedState =>
snapshotFromJournal.ref.tell(ps)
State(ps.s)
}
})
.snapshotWhen { (_, event, _) =>
event.e.contains("snapshot")
}
val ref = spawn(b)
ref.tell(Command("one"))
stateProbe.expectMessage(State("|one"))
ref.tell(Command("snapshot now"))
stateProbe.expectMessage(State("|one|snapshot now"))
snapshotToJournal.expectMessage(State("|one|snapshot now"))
ref.tell(Command("shutdown"))
val ref2 = spawn(b)
snapshotFromJournal.expectMessage(PersistedState("|one|snapshot now"))
ref2.tell(Command("get"))
stateProbe.expectMessage(State("|one|snapshot now"))
}
}
}